diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-09-20 16:43:10 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-09-20 16:43:10 +0000 |
commit | fcdf1723c7b5cdf0772054a93edb6e7d97c4bb1e (patch) | |
tree | 14fcd6191b11450f38cf88caa8dd9e44bbc5555b | |
parent | 7defd7eb7b11a0ff83ee0a7393477a453f4fb604 (diff) | |
download | qpid-python-fcdf1723c7b5cdf0772054a93edb6e7d97c4bb1e.tar.gz |
QPID-4984: WIP - compiles, but not functional.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1525050 13f79535-47bb-0310-9956-ffa450edef68
38 files changed, 2170 insertions, 566 deletions
diff --git a/qpid/cpp/src/linearstore.cmake b/qpid/cpp/src/linearstore.cmake index ddf70c75b9..5377acd207 100644 --- a/qpid/cpp/src/linearstore.cmake +++ b/qpid/cpp/src/linearstore.cmake @@ -80,12 +80,18 @@ if (BUILD_LINEARSTORE) qpid/linearstore/jrnl/deq_rec.cpp qpid/linearstore/jrnl/enq_map.cpp qpid/linearstore/jrnl/enq_rec.cpp + qpid/linearstore/jrnl/EmptyFilePool.cpp + qpid/linearstore/jrnl/EmptyFilePoolManager.cpp + qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp #qpid/linearstore/jrnl/fcntl.cpp qpid/linearstore/jrnl/jcntl.cpp qpid/linearstore/jrnl/jdir.cpp qpid/linearstore/jrnl/jerrno.cpp qpid/linearstore/jrnl/jexception.cpp #qpid/linearstore/jrnl/jinf.cpp + qpid/linearstore/jrnl/JournalFile.cpp + qpid/linearstore/jrnl/JournalFileController.cpp + qpid/linearstore/jrnl/JournalLog.cpp qpid/linearstore/jrnl/jrec.cpp #qpid/linearstore/jrnl/lp_map.cpp #qpid/linearstore/jrnl/lpmgr.cpp @@ -106,6 +112,7 @@ if (BUILD_LINEARSTORE) qpid/linearstore/BindingDbt.cpp qpid/linearstore/BufferValue.cpp qpid/linearstore/DataTokenImpl.cpp + qpid/linearstore/EmptyFilePoolManagerImpl.cpp qpid/linearstore/IdDbt.cpp qpid/linearstore/IdSequence.cpp qpid/linearstore/JournalImpl.cpp @@ -118,9 +125,9 @@ if (BUILD_LINEARSTORE) qpid/linearstore/jrnl/utils/deq_hdr.c qpid/linearstore/jrnl/utils/enq_hdr.c qpid/linearstore/jrnl/utils/file_hdr.c - qpid/linearstore/jrnl/utils/rec_hdr.c - qpid/linearstore/jrnl/utils/rec_tail.c - qpid/linearstore/jrnl/utils/txn_hdr.c + qpid/linearstore/jrnl/utils/rec_hdr.c + qpid/linearstore/jrnl/utils/rec_tail.c + qpid/linearstore/jrnl/utils/txn_hdr.c ) # linearstore include directories diff --git a/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp b/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp index 3a11817d1e..204affd1d1 100644 --- a/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp +++ b/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp @@ -72,7 +72,7 @@ rmgr::initialize(aio_callback* const cbp) throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize"); } _fhdr_aio_cb_ptr = new aio_cb; - std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb*)); + std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb)); } void diff --git a/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp new file mode 100644 index 0000000000..37e3922846 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "EmptyFilePoolManagerImpl.h" + +#include "QpidLog.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" + +namespace qpid { +namespace linearstore { + +EmptyFilePoolManagerImpl::EmptyFilePoolManagerImpl(const std::string& qlsStorePath) : + qpid::qls_jrnl::EmptyFilePoolManager(qlsStorePath) +{} + +EmptyFilePoolManagerImpl::~EmptyFilePoolManagerImpl() {} + +void EmptyFilePoolManagerImpl::findEfpPartitions() { + qpid::qls_jrnl::EmptyFilePoolManager::findEfpPartitions(); + QLS_LOG(info, "EFP Manager initialization complete"); + std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*> partitionList; + std::vector<qpid::qls_jrnl::EmptyFilePool*> filePoolList; + getEfpPartitions(partitionList); + if (partitionList.size() == 0) { + QLS_LOG(error, "NO EFP PARTITIONS FOUND! No queue creation is possible.") + } else { + QLS_LOG(info, "> EFP Partitions found: " << partitionList.size()); + for (std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) { + filePoolList.clear(); + (*i)->getEmptyFilePools(filePoolList); + QLS_LOG(info, " * Partition " << (*i)->partitionNumber() << " containing " << filePoolList.size() << " pool" << + (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->partitionDirectory() << "\'"); + for (std::vector<qpid::qls_jrnl::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) { + QLS_LOG(info, " - EFP \'" << (*j)->fileSizeKib() << "k\' containing " << (*j)->numEmptyFiles() << + " files of size " << (*j)->fileSizeKib() << " KiB totaling " << (*j)->cumFileSizeKib() << " KiB"); + } + } + } +} + +}} /* namespace qpid::linearstore */ diff --git a/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.h b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.h new file mode 100644 index 0000000000..f9087b2fbc --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.h @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LINEARSTORE_EMPTYFILEPOOLMANAGERIMPL_H_ +#define QPID_LINEARSTORE_EMPTYFILEPOOLMANAGERIMPL_H_ + +#include "qpid/linearstore/jrnl/EmptyFilePoolManager.h" + +namespace qpid { +namespace linearstore { + +class EmptyFilePoolManagerImpl: public qpid::qls_jrnl::EmptyFilePoolManager +{ +public: + EmptyFilePoolManagerImpl(const std::string& qlsStorePath); + virtual ~EmptyFilePoolManagerImpl(); + void findEfpPartitions(); +}; + +}} /* namespace qpid::linearstore */ + +#endif /* QPID_LINEARSTORE_EMPTYFILEPOOLMANAGERIMPL_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp index f8b7777269..3fdfb24592 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp @@ -23,16 +23,17 @@ #include "qpid/linearstore/jrnl/jerrno.h" #include "qpid/linearstore/jrnl/jexception.h" +#include "qpid/linearstore/jrnl/EmptyFilePool.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" -#include "qmf/org/apache/qpid/linearstore/ArgsJournalExpand.h" +//#include "qmf/org/apache/qpid/linearstore/ArgsJournalExpand.h" #include "qmf/org/apache/qpid/linearstore/EventCreated.h" #include "qmf/org/apache/qpid/linearstore/EventEnqThresholdExceeded.h" #include "qmf/org/apache/qpid/linearstore/EventFull.h" #include "qmf/org/apache/qpid/linearstore/EventRecovered.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Timer.h" -#include "qpid/linearstore/Log.h" +#include "qpid/linearstore/QpidLog.h" #include "qpid/linearstore/StoreException.h" using namespace qpid::qls_jrnl; @@ -53,22 +54,23 @@ void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); i JournalImpl::JournalImpl(qpid::sys::Timer& timer_, const std::string& journalId, const std::string& journalDirectory, - const std::string& journalBaseFilename, +// const std::string& journalBaseFilename, const qpid::sys::Duration getEventsTimeout, const qpid::sys::Duration flushTimeout, qpid::management::ManagementAgent* a, DeleteCallback onDelete): - jcntl(journalId, journalDirectory, journalBaseFilename), + jcntl(journalId, journalDirectory/*, journalBaseFilename*/), timer(timer_), getEventsTimerSetFlag(false), - lastReadRid(0), + efpp(0), +// lastReadRid(0), writeActivityFlag(false), flushTriggeredFlag(true), - _xidp(0), - _datap(0), - _dlen(0), - _dtok(), - _external(false), +// _xidp(0), +// _datap(0), +// _dlen(0), +// _dtok(), +// _external(false), deleteCallback(onDelete) { getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout); @@ -82,7 +84,7 @@ JournalImpl::JournalImpl(qpid::sys::Timer& timer_, QLS_LOG2(notice, _jid, "Created"); std::ostringstream oss; - oss << "Journal directory = \"" << journalDirectory << "\"; Base file name = \"" << journalBaseFilename << "\""; + oss << "Journal directory = \"" << journalDirectory << "\""; QLS_LOG2(debug, _jid, oss.str()); } @@ -95,7 +97,7 @@ JournalImpl::~JournalImpl() } getEventsFireEventsPtr->cancel(); inactivityFireEventPtr->cancel(); - free_read_buffers(); +// free_read_buffers(); if (_mgmtObject.get() != 0) { _mgmtObject->resourceDestroy(); @@ -116,14 +118,14 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a) _mgmtObject->set_name(_jid); _mgmtObject->set_directory(_jdir.dirname()); - _mgmtObject->set_baseFileName(_base_filename); +// _mgmtObject->set_baseFileName(_base_filename); _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE); _mgmtObject->set_readPages(JRNL_RMGR_PAGES); // The following will be set on initialize(), but being properties, these must be set to 0 in the meantime - _mgmtObject->set_initialFileCount(0); - _mgmtObject->set_dataFileSize(0); - _mgmtObject->set_currentFileCount(0); + //_mgmtObject->set_initialFileCount(0); + //_mgmtObject->set_dataFileSize(0); + //_mgmtObject->set_currentFileCount(0); _mgmtObject->set_writePageSize(0); _mgmtObject->set_writePages(0); @@ -133,22 +135,23 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a) void -JournalImpl::initialize(/*const uint16_t num_jfiles, - const bool auto_expand, - const uint16_t ae_max_jfiles, - const uint32_t jfsize_sblks,*/ +JournalImpl::initialize(qpid::qls_jrnl::EmptyFilePool* efpp_, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, qpid::qls_jrnl::aio_callback* const cbp) { - std::ostringstream oss; -// oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; - oss << "Initialize;"; - oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; - oss << " wcache_num_pages=" << wcache_num_pages; - QLS_LOG2(debug, _jid, oss.str()); - jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, cbp); - QLS_LOG2(debug, _jid, "Initialization complete"); + efpp = efpp_; +// efpp->createJournal(_jdir); +// QLS_LOG2(notice, _jid, "Initialized"); +// std::ostringstream oss; +//// oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; +// oss << "Initialize; efpPartitionNumber=" << efpp_->getPartitionNumber(); +// oss << " efpFileSizeKb=" << efpp_->fileSizeKib(); +// oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; +// oss << " wcache_num_pages=" << wcache_num_pages; +// QLS_LOG2(debug, _jid, oss.str()); + jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp, wcache_num_pages, wcache_pgsize_sblks, cbp); +// QLS_LOG2(debug, _jid, "Initialization complete"); // TODO: replace for linearstore: _lpmgr /* if (_mgmtObject.get() != 0) @@ -261,6 +264,7 @@ JournalImpl::recover_complete() //#define AIO_SLEEP_TIME_US 10 // 0.01 ms // Return true if content is recovered from store; false if content is external and must be recovered from an external store. // Throw exception for all errors. +/* bool JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset) { @@ -351,6 +355,7 @@ JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size } return true; } +*/ void JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len, @@ -574,6 +579,7 @@ void JournalImpl::rd_aio_cb(std::vector<uint16_t>& /*pil*/) {} +/* void JournalImpl::free_read_buffers() { @@ -586,6 +592,12 @@ JournalImpl::free_read_buffers() _datap = 0; } } +*/ + +void +JournalImpl::createStore() { + +} void JournalImpl::handleIoResult(const iores r) @@ -624,12 +636,13 @@ JournalImpl::handleIoResult(const iores r) } } -qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t methodId, +qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/, qpid::management::Args& /*args*/, std::string& /*text*/) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; +/* switch (methodId) { case _qmf::Journal::METHOD_EXPAND : @@ -640,6 +653,7 @@ qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t m status = Manageable::STATUS_NOT_IMPLEMENTED; break; } +*/ return status; } diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h index 692bccc9b0..264162e5bb 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h @@ -19,11 +19,12 @@ * */ -#ifndef QPID_LEGACYSTORE_JOURNALIMPL_H -#define QPID_LEGACYSTORE_JOURNALIMPL_H +#ifndef QPID_LINEARSTORE_JOURNALIMPL_H +#define QPID_LINEARSTORE_JOURNALIMPL_H #include <set> #include "qpid/linearstore/jrnl/enums.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" #include "qpid/linearstore/jrnl/jcntl.h" #include "qpid/linearstore/DataTokenImpl.h" #include "qpid/linearstore/PreparedTransaction.h" @@ -35,12 +36,15 @@ #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/linearstore/Journal.h" -namespace qpid { +namespace qpid{ + namespace sys { class Timer; -}} +} +namespace qls_jrnl { +class EmptyFilePool; +} -namespace qpid{ namespace linearstore{ class JournalImpl; @@ -83,20 +87,21 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr; qpid::sys::Mutex _getf_lock; qpid::sys::Mutex _read_lock; + qpid::qls_jrnl::EmptyFilePool* efpp; - uint64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests - std::vector<uint64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence +// uint64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests +// std::vector<uint64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence bool writeActivityFlag; bool flushTriggeredFlag; boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr; // temp local vars for loadMsgContent below - void* _xidp; - void* _datap; - size_t _dlen; - qpid::qls_jrnl::data_tok _dtok; - bool _external; +// void* _xidp; +// void* _datap; +// size_t _dlen; +// qpid::qls_jrnl::data_tok _dtok; +// bool _external; qpid::management::ManagementAgent* _agent; qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject; @@ -107,7 +112,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr JournalImpl(qpid::sys::Timer& timer, const std::string& journalId, const std::string& journalDirectory, - const std::string& journalBaseFilename, +// const std::string& journalBaseFilename, const qpid::sys::Duration getEventsTimeout, const qpid::sys::Duration flushTimeout, qpid::management::ManagementAgent* agent, @@ -121,6 +126,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr const bool auto_expand, const uint16_t ae_max_jfiles, const uint32_t jfsize_sblks,*/ + qpid::qls_jrnl::EmptyFilePool* efp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, qpid::qls_jrnl::aio_callback* const cbp); @@ -129,10 +135,10 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr const bool auto_expand, const uint16_t ae_max_jfiles, const uint32_t jfsize_sblks,*/ + qpid::qls_jrnl::EmptyFilePool* efp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks) { - initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, - this); + initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efp, wcache_num_pages, wcache_pgsize_sblks, this); } void recover(/*const uint16_t num_jfiles, @@ -164,7 +170,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr // Temporary fn to read and save last msg read from journal so it can be assigned // in chunks. To be replaced when coding to do this direct from the journal is ready. // Returns true if the record is extern, false if local. - bool loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset = 0); +// bool loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset = 0); // Overrides for write inactivity timer void enqueue_data_record(const void* const data_buff, const size_t tot_data_len, @@ -216,7 +222,8 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr void resetDeleteCallback() { deleteCallback = DeleteCallback(); } private: - void free_read_buffers(); +// void free_read_buffers(); + void createStore(); inline void setGetEventTimer() { @@ -242,11 +249,11 @@ class TplJournalImpl : public JournalImpl TplJournalImpl(qpid::sys::Timer& timer, const std::string& journalId, const std::string& journalDirectory, - const std::string& journalBaseFilename, +// const std::string& journalBaseFilename, const qpid::sys::Duration getEventsTimeout, const qpid::sys::Duration flushTimeout, qpid::management::ManagementAgent* agent) : - JournalImpl(timer, journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent) + JournalImpl(timer, journalId, journalDirectory/*, journalBaseFilename*/, getEventsTimeout, flushTimeout, agent) {} virtual ~TplJournalImpl() {} @@ -263,4 +270,4 @@ class TplJournalImpl : public JournalImpl } // namespace msgstore } // namespace mrg -#endif // ifndef QPID_LEGACYSTORE_JOURNALIMPL_H +#endif // ifndef QPID_LINEARSTORE_JOURNALIMPL_H diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index f4b9a40455..5372963bc2 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -21,17 +21,19 @@ #include "qpid/linearstore/MessageStoreImpl.h" +#include "db-inc.h" #include "qpid/broker/QueueSettings.h" #include "qpid/linearstore/BindingDbt.h" #include "qpid/linearstore/BufferValue.h" #include "qpid/linearstore/IdDbt.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolManager.h" #include "qpid/linearstore/jrnl/txn_map.h" -#include "qpid/linearstore/Log.h" +#include "qpid/linearstore/QpidLog.h" #include "qpid/framing/FieldValue.h" #include "qmf/org/apache/qpid/linearstore/Package.h" #include "qpid/linearstore/StoreException.h" #include <dirent.h> -#include <db.h> + #define MAX_AIO_SLEEPS 100000 // tot: ~1 sec #define AIO_SLEEP_TIME_US 10 // 0.01 ms @@ -43,53 +45,46 @@ namespace linearstore{ const std::string MessageStoreImpl::storeTopLevelDir("qls"); // Sets the top-level store dir name + // FIXME aconway 2010-03-09: was 10 qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::TIME_MSEC); // 10ms qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s qpid::sys::Mutex TxnCtxt::globalSerialiser; -MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t _rid, - const bool _deq_flag, - const bool _commit_flag, - const bool _tpc_flag) : - rid(_rid), - deq_flag(_deq_flag), - commit_flag(_commit_flag), - tpc_flag(_tpc_flag) +MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t rid_, + const bool deq_flag_, + const bool commit_flag_, + const bool tpc_flag_) : + rid(rid_), + deq_flag(deq_flag_), + commit_flag(commit_flag_), + tpc_flag(tpc_flag_) {} -MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath) : - numJrnlFiles(0), - autoJrnlExpand(false), - autoJrnlExpandMaxFiles(0), - jrnlFsizeSblks(0), +MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) : + defaultEfpPartitionNumber(0), + defaultEfpFileSizeKib(0), truncateFlag(false), wCachePgSizeSblks(0), wCacheNumPages(0), - tplNumJrnlFiles(0), - tplJrnlFsizeSblks(0), tplWCachePgSizeSblks(0), tplWCacheNumPages(0), highestRid(0), isInit(false), - envPath(envpath), + envPath(envpath_), broker(broker_), mgmtObject(), agent(0) {} -uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param, const std::string paramName/*, const uint16_t jrnlFsizePgs*/) +uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param_, const std::string& paramName_) { - uint32_t p = param; + uint32_t p = param_; -/* if (jrnlFsizePgs == 1 && p > 64 ) { - p = 64; - QLS_LOG(warning, "parameter " << paramName << " (" << param << ") cannot set a page size greater than the journal file size; changing this parameter to the journal file size (" << p << ")"); - } - else*/ if (p == 0) { + if (p == 0) { // For zero value, use default p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024; - QLS_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")"); + QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")"); } else if ( p > 128 || (p & (p-1)) ) { // For any positive value that is not a power of 2, use closest value if (p < 6) p = 4; @@ -98,16 +93,16 @@ uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param, const st else if (p < 48) p = 32; else if (p < 96) p = 64; else p = 128; - QLS_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")"); + QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")"); } return p; } -uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib) +uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_) { - uint32_t wrPageSizeSblks = wrPageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks + uint32_t wrPageSizeSblks = wrPageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks uint32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB). - switch (wrPageSizeKib) + switch (wrPageSizeKib_) { case 1: case 2: @@ -124,6 +119,29 @@ uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib) } } +qpid::qls_jrnl::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition_, + const std::string& /*paramName_*/) { + // TODO: check against list of existing partitions, throw if not found + return partition_; +} + +qpid::qls_jrnl::efpFileSizeKib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_, + const std::string& paramName_) { + uint8_t rem = efpFileSizeKib_ % uint64_t(JRNL_SBLK_SIZE_KIB); + if (rem != 0) { + uint64_t newVal = efpFileSizeKib_ - rem; + if (rem >= (JRNL_SBLK_SIZE_KIB / 2)) + newVal += JRNL_SBLK_SIZE_KIB; + QLS_LOG(warning, "Parameter " << paramName_ << " (" << efpFileSizeKib_ << ") must be a multiple of " << + JRNL_SBLK_SIZE_KIB << "; changing this parameter to the closest allowable value (" << + newVal << ")"); + return newVal; + } + return efpFileSizeKib_; + + // TODO: check against list of existing pools in the given partition +} + void MessageStoreImpl::initManagement () { if (broker != 0) { @@ -134,15 +152,10 @@ void MessageStoreImpl::initManagement () new _qmf::Store(agent, this, broker)); mgmtObject->set_location(storeDir); - mgmtObject->set_defaultInitialFileCount(numJrnlFiles); - mgmtObject->set_defaultDataFileSize(jrnlFsizeSblks / JRNL_RMGR_PAGE_SIZE); mgmtObject->set_tplIsInitialized(false); mgmtObject->set_tplDirectory(getTplBaseDir()); mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE); mgmtObject->set_tplWritePages(tplWCacheNumPages); - mgmtObject->set_tplInitialFileCount(tplNumJrnlFiles); - mgmtObject->set_tplDataFileSize(tplJrnlFsizeSblks * JRNL_SBLK_SIZE); - mgmtObject->set_tplCurrentFileCount(tplNumJrnlFiles); agent->addObject(mgmtObject, 0, true); @@ -154,61 +167,58 @@ void MessageStoreImpl::initManagement () } } -bool MessageStoreImpl::init(const qpid::Options* options) +bool MessageStoreImpl::init(const qpid::Options* options_) { // Extract and check options - const StoreOptions* opts = static_cast<const StoreOptions*>(options); + const StoreOptions* opts = static_cast<const StoreOptions*>(options_); + qpid::qls_jrnl::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition"); + qpid::qls_jrnl::efpFileSizeKib_t efpFilePoolSize = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size"); uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size"); uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size"); // Pass option values to init() - return init(opts->storeDir, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib); + return init(opts->storeDir, efpPartition, efpFilePoolSize, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib); } // These params, taken from options, are assumed to be correct and verified -bool MessageStoreImpl::init(const std::string& dir, +bool MessageStoreImpl::init(const std::string& storeDir_, /*uint16_t jfiles, uint32_t jfileSizePgs,*/ - const bool truncateFlag, - uint32_t wCachePageSizeKib, + qpid::qls_jrnl::efpPartitionNumber_t efpPartition_, + qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_, + const bool truncateFlag_, + uint32_t wCachePageSizeKib_, /*uint16_t tplJfiles, uint32_t tplJfileSizePgs,*/ - uint32_t tplWCachePageSizeKib/*, - bool autoJExpand, - uint16_t autoJExpandMaxFiles*/) + uint32_t tplWCachePageSizeKib_) + /*bool autoJExpand, + uint16_t autoJExpandMaxFiles)*/ { if (isInit) return true; // Set geometry members (converting to correct units where req'd) -// numJrnlFiles = jfiles; -// jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE; - wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks - wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib); -// tplNumJrnlFiles = tplJfiles; -// tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE; - tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks - tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib); -// autoJrnlExpand = autoJExpand; -// autoJrnlExpandMaxFiles = autoJExpandMaxFiles; - if (dir.size()>0) storeDir = dir; - - if (truncateFlag) - truncateInit(false); + defaultEfpPartitionNumber = efpPartition_; + defaultEfpFileSizeKib = efpFileSizeKib_; + wCachePgSizeSblks = wCachePageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks + wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib_); + tplWCachePgSizeSblks = tplWCachePageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks + tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib_); + if (storeDir_.size()>0) storeDir = storeDir_; + + if (truncateFlag_) + truncateInit(); else init(); - QLS_LOG(notice, "Store module initialized; store-dir=" << dir); -// QLS_LOG(info, "> Default files per journal: " << jfiles); -// TODO: Uncomment these lines when auto-expand is enabled. -// QLS_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled")); -// if (autoJrnlExpand) QLS_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles); -// QLS_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)"); - QLS_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (KiB)"); + QLS_LOG(notice, "Store module initialized; store-dir=" << storeDir_); + QLS_LOG(info, "> Default EFP partition: " << defaultEfpPartitionNumber); + QLS_LOG(info, "> Default EFP file size: " << defaultEfpFileSizeKib << " (KiB)"); + QLS_LOG(info, "> Default write cache page size: " << wCachePageSizeKib_ << " (KiB)"); QLS_LOG(info, "> Default number of write cache pages: " << wCacheNumPages); -// QLS_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles); -// QLS_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)"); - QLS_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (KiB)"); + QLS_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib_ << " (KiB)"); QLS_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages); + QLS_LOG(info, "> EFP partition: " << defaultEfpPartitionNumber); + QLS_LOG(info, "> EFP file size pool: " << defaultEfpFileSizeKib << " (KiB)"); return isInit; } @@ -263,7 +273,7 @@ void MessageStoreImpl::init() // NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure. // However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the // TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026 - tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, 0)); + tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), /*"tpl",*/ defJournalGetEventsTimeout, defJournalFlushTimeout, 0)); isInit = true; } catch (const DbException& e) { if (e.get_errno() == DB_VERSION_MISMATCH) @@ -286,6 +296,9 @@ void MessageStoreImpl::init() throw; } } while (!isInit); + + efpMgr.reset(new EmptyFilePoolManagerImpl(getStoreTopLevelDir())); + efpMgr->findEfpPartitions(); } void MessageStoreImpl::finalize() @@ -307,7 +320,7 @@ void MessageStoreImpl::finalize() } } -void MessageStoreImpl::truncateInit(const bool saveStoreContent) +void MessageStoreImpl::truncateInit() { if (isInit) { { @@ -324,15 +337,10 @@ void MessageStoreImpl::truncateInit(const bool saveStoreContent) dbenv->close(0); isInit = false; } - std::ostringstream oss; - oss << storeDir << "/" << storeTopLevelDir; - if (saveStoreContent) { - std::string dir = qpid::qls_jrnl::jdir::push_down(storeDir, storeTopLevelDir, "cluster"); - QLS_LOG(notice, "Store directory " << oss.str() << " was pushed down (saved) into directory " << dir << "."); - } else { - qpid::qls_jrnl::jdir::delete_dir(oss.str().c_str()); - QLS_LOG(notice, "Store directory " << oss.str() << " was truncated."); - } + qpid::qls_jrnl::jdir::delete_dir(getBdbBaseDir()); + qpid::qls_jrnl::jdir::delete_dir(getJrnlBaseDir()); + qpid::qls_jrnl::jdir::delete_dir(getTplBaseDir()); + QLS_LOG(notice, "Store directory " << getStoreTopLevelDir() << " was truncated."); init(); } @@ -342,18 +350,18 @@ void MessageStoreImpl::chkTplStoreInit() qpid::sys::Mutex::ScopedLock sl(tplInitLock); if (!tplStorePtr->is_ready()) { qpid::qls_jrnl::jdir::create_dir(getTplBaseDir()); - tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ tplWCacheNumPages, tplWCachePgSizeSblks); + tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSizeKib), tplWCacheNumPages, tplWCachePgSizeSblks); if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true); } } -void MessageStoreImpl::open(db_ptr db, - DbTxn* txn, - const char* file, - bool dupKey) +void MessageStoreImpl::open(db_ptr db_, + DbTxn* txn_, + const char* file_, + bool dupKey_) { - if(dupKey) db->set_flags(DB_DUPSORT); - db->open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0); + if(dupKey_) db_->set_flags(DB_DUPSORT); + db_->open(txn_, file_, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0); } void MessageStoreImpl::closeDbs() @@ -385,15 +393,15 @@ MessageStoreImpl::~MessageStoreImpl() } } -void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, - const qpid::framing::FieldTable& /*args*/) +void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, + const qpid::framing::FieldTable& args_) { + QLS_LOG(info, "*** MessageStoreImpl::create() queue=\"" << queue_.getName() << "\""); // DEBUG checkInit(); - if (queue.getPersistenceId()) { - THROW_STORE_EXCEPTION("Queue already created: " + queue.getName()); + if (queue_.getPersistenceId()) { + THROW_STORE_EXCEPTION("Queue already created: " + queue_.getName()); } JournalImpl* jQueue = 0; - qpid::framing::FieldTable::ValuePtr value; // uint16_t localFileCount = numJrnlFiles; // bool localAutoExpandFlag = autoJrnlExpand; @@ -408,18 +416,18 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, // if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) // localFileSizeSblks = chkJrnlFileSizeParam((uint32_t) value->get<int>(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE; - if (queue.getName().size() == 0) + if (queue_.getName().size() == 0) { QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue."); return; } - jQueue = new JournalImpl(broker->getTimer(), queue.getName(), getJrnlDir(queue), std::string("JournalData"), + jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_), /*std::string("JournalData"),*/ defJournalGetEventsTimeout, defJournalFlushTimeout, agent, boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { qpid::sys::Mutex::ScopedLock sl(journalListLock); - journalList[queue.getName()]=jQueue; + journalList[queue_.getName()]=jQueue; } // value = args.get("qpid.auto_expand"); @@ -429,25 +437,71 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, // value = args.get("qpid.auto_expand_max_jfiles"); // if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) // localAutoExpandMaxFileCount = (uint16_t) value->get<int>(); +/* + qpid::framing::FieldTable::ValuePtr value; + qpid::qls_jrnl::efpPartitionNumber_t localEfpPartition = efpPartition; + value = args_.get("qpid.efp_partition"); + if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { + localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition"); + } + + qpid::qls_jrnl::efpFileSizeKib_t localEfpFileSizeKib = efpFileSizeKib; + value = args_.get("qpid.efp_file_size"); + if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { + localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" ); + } +*/ - queue.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue)); + queue_.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue)); try { // init will create the deque's for the init... - jQueue->initialize(/*localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks,*/ wCacheNumPages, wCachePgSizeSblks); +// jQueue->initialize(localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks); + jQueue->initialize(getEmptyFilePool(args_), wCacheNumPages, wCachePgSizeSblks); } catch (const qpid::qls_jrnl::jexception& e) { - THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what()); + THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": create() failed: " + e.what()); } try { - if (!create(queueDb, queueIdSequence, queue)) { - THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName()); + if (!create(queueDb, queueIdSequence, queue_)) { + THROW_STORE_EXCEPTION("Queue already exists: " + queue_.getName()); } } catch (const DbException& e) { - THROW_STORE_EXCEPTION_2("Error creating queue named " + queue.getName(), e); + THROW_STORE_EXCEPTION_2("Error creating queue named " + queue_.getName(), e); + } +} + +qpid::qls_jrnl::EmptyFilePool* +MessageStoreImpl::getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t efpPartitionNumber_, + const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_) { + qpid::qls_jrnl::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_); + if (efpp == 0) { + std::ostringstream oss; + oss << "Partition=" << efpPartitionNumber_ << "; EfpFileSize=" << efpFileSizeKib_ << " KiB"; + throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR_EFP_NOEFP, oss.str(), "MessageStoreImpl", "getEmptyFilePool"); } + return efpp; } -void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue) +qpid::qls_jrnl::EmptyFilePool* +MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) { + qpid::framing::FieldTable::ValuePtr value; + qpid::qls_jrnl::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber; + value = args_.get("qpid.efp_partition"); + if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { + localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition"); + } + + qpid::qls_jrnl::efpFileSizeKib_t localEfpFileSizeKib = defaultEfpFileSizeKib; + value = args_.get("qpid.efp_file_size"); + if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { + localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" ); + } + return getEmptyFilePool(localEfpPartition, localEfpFileSizeKib); +} + +void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue_) { + QLS_LOG(info, "*** MessageStoreImpl::destroy() queue=\"" << queue_.getName() << "\""); +/* checkInit(); destroy(queueDb, queue); deleteBindingsForQueue(queue); @@ -461,21 +515,22 @@ void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue) journalList.erase(queue.getName()); } } +*/ } -void MessageStoreImpl::create(const qpid::broker::PersistableExchange& exchange, - const qpid::framing::FieldTable& /*args*/) +void MessageStoreImpl::create(const qpid::broker::PersistableExchange& exchange_, + const qpid::framing::FieldTable& /*args_*/) { checkInit(); - if (exchange.getPersistenceId()) { - THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName()); + if (exchange_.getPersistenceId()) { + THROW_STORE_EXCEPTION("Exchange already created: " + exchange_.getName()); } try { - if (!create(exchangeDb, exchangeIdSequence, exchange)) { - THROW_STORE_EXCEPTION("Exchange already exists: " + exchange.getName()); + if (!create(exchangeDb, exchangeIdSequence, exchange_)) { + THROW_STORE_EXCEPTION("Exchange already exists: " + exchange_.getName()); } } catch (const DbException& e) { - THROW_STORE_EXCEPTION_2("Error creating exchange named " + exchange.getName(), e); + THROW_STORE_EXCEPTION_2("Error creating exchange named " + exchange_.getName(), e); } } @@ -488,14 +543,14 @@ void MessageStoreImpl::destroy(const qpid::broker::PersistableExchange& exchange bindingDb->del(0, &key, DB_AUTO_COMMIT); } -void MessageStoreImpl::create(const qpid::broker::PersistableConfig& general) +void MessageStoreImpl::create(const qpid::broker::PersistableConfig& general_) { checkInit(); - if (general.getPersistenceId()) { + if (general_.getPersistenceId()) { THROW_STORE_EXCEPTION("General configuration item already created"); } try { - if (!create(generalDb, generalIdSequence, general)) { + if (!create(generalDb, generalIdSequence, general_)) { THROW_STORE_EXCEPTION("General configuration already exists"); } } catch (const DbException& e) { @@ -503,25 +558,25 @@ void MessageStoreImpl::create(const qpid::broker::PersistableConfig& general) } } -void MessageStoreImpl::destroy(const qpid::broker::PersistableConfig& general) +void MessageStoreImpl::destroy(const qpid::broker::PersistableConfig& general_) { checkInit(); - destroy(generalDb, general); + destroy(generalDb, general_); } -bool MessageStoreImpl::create(db_ptr db, - IdSequence& seq, - const qpid::broker::Persistable& p) +bool MessageStoreImpl::create(db_ptr db_, + IdSequence& seq_, + const qpid::broker::Persistable& p_) { - uint64_t id (seq.next()); + uint64_t id (seq_.next()); Dbt key(&id, sizeof(id)); - BufferValue value (p); + BufferValue value (p_); int status; TxnCtxt txn; txn.begin(dbenv.get(), true); try { - status = db->put(txn.get(), &key, &value, DB_NOOVERWRITE); + status = db_->put(txn.get(), &key, &value, DB_NOOVERWRITE); txn.commit(); } catch (...) { txn.abort(); @@ -530,27 +585,27 @@ bool MessageStoreImpl::create(db_ptr db, if (status == DB_KEYEXIST) { return false; } else { - p.setPersistenceId(id); + p_.setPersistenceId(id); return true; } } -void MessageStoreImpl::destroy(db_ptr db, const qpid::broker::Persistable& p) +void MessageStoreImpl::destroy(db_ptr db, const qpid::broker::Persistable& p_) { qpid::sys::Mutex::ScopedLock sl(bdbLock); - IdDbt key(p.getPersistenceId()); + IdDbt key(p_.getPersistenceId()); db->del(0, &key, DB_AUTO_COMMIT); } -void MessageStoreImpl::bind(const qpid::broker::PersistableExchange& e, - const qpid::broker::PersistableQueue& q, - const std::string& k, - const qpid::framing::FieldTable& a) +void MessageStoreImpl::bind(const qpid::broker::PersistableExchange& e_, + const qpid::broker::PersistableQueue& q_, + const std::string& k_, + const qpid::framing::FieldTable& a_) { checkInit(); - IdDbt key(e.getPersistenceId()); - BindingDbt value(e, q, k, a); + IdDbt key(e_.getPersistenceId()); + BindingDbt value(e_, q_, k_, a_); TxnCtxt txn; txn.begin(dbenv.get(), true); try { @@ -562,16 +617,16 @@ void MessageStoreImpl::bind(const qpid::broker::PersistableExchange& e, } } -void MessageStoreImpl::unbind(const qpid::broker::PersistableExchange& e, - const qpid::broker::PersistableQueue& q, - const std::string& k, - const qpid::framing::FieldTable&) +void MessageStoreImpl::unbind(const qpid::broker::PersistableExchange& e_, + const qpid::broker::PersistableQueue& q_, + const std::string& k_, + const qpid::framing::FieldTable& /*a_*/) { checkInit(); - deleteBinding(e, q, k); + deleteBinding(e_, q_, k_); } -void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry) +void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) { checkInit(); txn_list prepared; @@ -585,14 +640,14 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry) txn.begin(dbenv.get(), false); try { //read all queues, calls recoversMessages - recoverQueues(txn, registry, queues, prepared, messages); + recoverQueues(txn, registry_, queues, prepared, messages); //recover exchange & bindings: - recoverExchanges(txn, registry, exchanges); + recoverExchanges(txn, registry_, exchanges); recoverBindings(txn, exchanges, queues); //recover general-purpose configuration - recoverGeneral(txn, registry); + recoverGeneral(txn, registry_); txn.commit(); } catch (const DbException& e) { @@ -629,7 +684,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry) tpcc->prepare(tplStorePtr.get()); qpid::broker::RecoverableTransaction::shared_ptr dtx; - if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn); + if (!incomplTplTxnFlag) dtx = registry_.recoverTransaction(xid, txn); if (pt.enqueues.get()) { for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) { tpcc->addXidRecord(queues[j->first]->getExternalQueueStore()); @@ -669,15 +724,17 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry) } } } - registry.recoveryComplete(); + registry_.recoveryComplete(); } -void MessageStoreImpl::recoverQueues(TxnCtxt& txn, - qpid::broker::RecoveryManager& registry, - queue_index& queue_index, - txn_list& prepared, - message_index& messages) +void MessageStoreImpl::recoverQueues(TxnCtxt& /*txn*/, + qpid::broker::RecoveryManager& /*registry*/, + queue_index& /*queue_index*/, + txn_list& /*prepared*/, + message_index& /*messages*/) { + QLS_LOG(info, "*** MessageStoreImpl::recoverQueues()"); +/* Cursor queues; queues.open(queueDb, txn.get()); @@ -714,24 +771,22 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, long rcnt = 0L; // recovered msg count long idcnt = 0L; // in-doubt msg count uint64_t thisHighestRid = 0ULL; - jQueue->recover(/*numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks,*/ wCacheNumPages, + jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery -/* - // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting - // from recovery of a store that has had its size changed externally by the resize utility. - // If so, update the queue store settings so that QMF queries will reflect the new values. - const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings; - qpid::framing::FieldTable::ValuePtr value; - value = storeargs.get("qpid.file_count"); - if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint16_t)value->get<int>() != jQueue->num_jfiles()) { - queue->addArgument("qpid.file_count", jQueue->num_jfiles()); - } - value = storeargs.get("qpid.file_size"); - if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) { - queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE); - } -*/ +// // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting +// // from recovery of a store that has had its size changed externally by the resize utility. +// // If so, update the queue store settings so that QMF queries will reflect the new values. +// const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings; +// qpid::framing::FieldTable::ValuePtr value; +// value = storeargs.get("qpid.file_count"); +// if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint16_t)value->get<int>() != jQueue->num_jfiles()) { +// queue->addArgument("qpid.file_count", jQueue->num_jfiles()); +// } +// value = storeargs.get("qpid.file_size"); +// if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) { +// queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE); +// } if (highestRid == 0ULL) highestRid = thisHighestRid; @@ -755,16 +810,17 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, QLS_LOG(info, "Most recent persistence id found: 0x" << std::hex << highestRid << std::dec); queueIdSequence.reset(maxQueueId + 1); +*/ } -void MessageStoreImpl::recoverExchanges(TxnCtxt& txn, - qpid::broker::RecoveryManager& registry, - exchange_index& index) +void MessageStoreImpl::recoverExchanges(TxnCtxt& txn_, + qpid::broker::RecoveryManager& registry_, + exchange_index& index_) { //TODO: this is a copy&paste from recoverQueues - refactor! Cursor exchanges; - exchanges.open(exchangeDb, txn.get()); + exchanges.open(exchangeDb, txn_.get()); uint64_t maxExchangeId(1); IdDbt key; @@ -773,11 +829,11 @@ void MessageStoreImpl::recoverExchanges(TxnCtxt& txn, while (exchanges.next(key, value)) { qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size()); //create a Exchange instance - qpid::broker::RecoverableExchange::shared_ptr exchange = registry.recoverExchange(buffer); + qpid::broker::RecoverableExchange::shared_ptr exchange = registry_.recoverExchange(buffer); if (exchange) { //set the persistenceId and update max as required exchange->setPersistenceId(key.id); - index[key.id] = exchange; + index_[key.id] = exchange; QLS_LOG(info, "Recovered exchange \"" << exchange->getName() << '"'); } maxExchangeId = std::max(key.id, maxExchangeId); @@ -785,12 +841,12 @@ void MessageStoreImpl::recoverExchanges(TxnCtxt& txn, exchangeIdSequence.reset(maxExchangeId + 1); } -void MessageStoreImpl::recoverBindings(TxnCtxt& txn, - exchange_index& exchanges, - queue_index& queues) +void MessageStoreImpl::recoverBindings(TxnCtxt& txn_, + exchange_index& exchanges_, + queue_index& queues_) { Cursor bindings; - bindings.open(bindingDb, txn.get()); + bindings.open(bindingDb, txn_.get()); IdDbt key; Dbt value; @@ -807,9 +863,9 @@ void MessageStoreImpl::recoverBindings(TxnCtxt& txn, buffer.getShortString(queueName); buffer.getShortString(routingkey); buffer.get(args); - exchange_index::iterator exchange = exchanges.find(key.id); - queue_index::iterator queue = queues.find(queueId); - if (exchange != exchanges.end() && queue != queues.end()) { + exchange_index::iterator exchange = exchanges_.find(key.id); + queue_index::iterator queue = queues_.find(queueId); + if (exchange != exchanges_.end() && queue != queues_.end()) { //could use the recoverable queue here rather than the name... exchange->second->bind(queueName, routingkey, args); QLS_LOG(info, "Recovered binding exchange=" << exchange->second->getName() @@ -823,11 +879,11 @@ void MessageStoreImpl::recoverBindings(TxnCtxt& txn, } } -void MessageStoreImpl::recoverGeneral(TxnCtxt& txn, - qpid::broker::RecoveryManager& registry) +void MessageStoreImpl::recoverGeneral(TxnCtxt& txn_, + qpid::broker::RecoveryManager& registry_) { Cursor items; - items.open(generalDb, txn.get()); + items.open(generalDb, txn_.get()); uint64_t maxGeneralId(1); IdDbt key; @@ -836,7 +892,7 @@ void MessageStoreImpl::recoverGeneral(TxnCtxt& txn, while (items.next(key, value)) { qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size()); //create instance - qpid::broker::RecoverableConfig::shared_ptr config = registry.recoverConfig(buffer); + qpid::broker::RecoverableConfig::shared_ptr config = registry_.recoverConfig(buffer); //set the persistenceId and update max as required config->setPersistenceId(key.id); maxGeneralId = std::max(key.id, maxGeneralId); @@ -845,14 +901,16 @@ void MessageStoreImpl::recoverGeneral(TxnCtxt& txn, } void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, - qpid::broker::RecoveryManager& recovery, - qpid::broker::RecoverableQueue::shared_ptr& queue, - txn_list& prepared, - message_index& messages, - long& rcnt, - long& idcnt) + qpid::broker::RecoveryManager& /*recovery*/, + qpid::broker::RecoverableQueue::shared_ptr& queue_, + txn_list& /*prepared*/, + message_index& /*messages*/, + long& /*rcnt*/, + long& /*idcnt*/) { - size_t preambleLength = sizeof(uint32_t)/*header size*/; + QLS_LOG(info, "*** MessageStoreImpl::recoverMessages() queue=\"" << queue_->getName() << "\""); +/* + size_t preambleLength = sizeof(uint32_t)header size; JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore()); DataTokenImpl dtok; @@ -977,38 +1035,39 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, } catch (const qpid::qls_jrnl::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what()); } +*/ } qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& /*recovery*/, - uint64_t /*messageId*/, - unsigned& /*headerSize*/) + uint64_t /*messageId*/, + unsigned& /*headerSize*/) { throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage"); } -int MessageStoreImpl::enqueueMessage(TxnCtxt& txn, - IdDbt& msgId, - qpid::broker::RecoverableMessage::shared_ptr& msg, - queue_index& index, - txn_list& prepared, - message_index& messages) +int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_, + IdDbt& msgId_, + qpid::broker::RecoverableMessage::shared_ptr& msg_, + queue_index& index_, + txn_list& prepared_, + message_index& messages_) { Cursor mappings; - mappings.open(mappingDb, txn.get()); + mappings.open(mappingDb, txn_.get()); IdDbt value; int count(0); - for (int status = mappings->get(&msgId, &value, DB_SET); status == 0; status = mappings->get(&msgId, &value, DB_NEXT_DUP)) { - if (index.find(value.id) == index.end()) { + for (int status = mappings->get(&msgId_, &value, DB_SET); status == 0; status = mappings->get(&msgId_, &value, DB_NEXT_DUP)) { + if (index_.find(value.id) == index_.end()) { QLS_LOG(warning, "Recovered message for queue that no longer exists"); mappings->del(0); } else { - qpid::broker::RecoverableQueue::shared_ptr queue = index[value.id]; - if (PreparedTransaction::isLocked(prepared, value.id, msgId.id)) { - messages[msgId.id] = msg; + qpid::broker::RecoverableQueue::shared_ptr queue = index_[value.id]; + if (PreparedTransaction::isLocked(prepared_, value.id, msgId_.id)) { + messages_[msgId_.id] = msg_; } else { - queue->recover(msg); + queue->recover(msg_); } count++; } @@ -1019,6 +1078,8 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn, void MessageStoreImpl::readTplStore() { + QLS_LOG(info, "*** MessageStoreImpl::readTplStore()"); +/* tplRecoverMap.clear(); qpid::qls_jrnl::txn_map& tmap = tplStorePtr->get_txn_map(); DataTokenImpl dtok; @@ -1087,13 +1148,16 @@ void MessageStoreImpl::readTplStore() } catch (const qpid::qls_jrnl::jexception& e) { THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what()); } +*/ } void MessageStoreImpl::recoverTplStore() { + QLS_LOG(info, "*** MessageStoreImpl::recoverTplStore()"); +/* if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) { uint64_t thisHighestRid = 0ULL; - tplStorePtr->recover(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0); + tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0); if (highestRid == 0ULL) highestRid = thisHighestRid; else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit @@ -1104,10 +1168,13 @@ void MessageStoreImpl::recoverTplStore() tplStorePtr->recover_complete(); // start journal. } +*/ } -void MessageStoreImpl::recoverLockedMappings(txn_list& txns) +void MessageStoreImpl::recoverLockedMappings(txn_list& /*txns*/) { + QLS_LOG(info, "*** MessageStoreImpl::recoverLockedMappings()"); +/* if (!tplStorePtr->is_ready()) recoverTplStore(); @@ -1119,10 +1186,13 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns) deq_ptr.reset(new LockedMappings); txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr)); } +*/ } -void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids) +void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/) { + QLS_LOG(info, "*** MessageStoreImpl::collectPreparedXids()"); +/* if (tplStorePtr->is_ready()) { tplStorePtr->read_reset(); readTplStore(); @@ -1134,6 +1204,7 @@ void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids) if (!i->second.deq_flag && i->second.tpc_flag) xids.insert(i->first); } +*/ } void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/) @@ -1147,17 +1218,19 @@ void MessageStoreImpl::destroy(qpid::broker::PersistableMessage& /*msg*/) } void MessageStoreImpl::appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& /*msg*/, - const std::string& /*data*/) + const std::string& /*data*/) { throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent"); } -void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& queue, - const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, - std::string& data, - uint64_t offset, - uint32_t length) +void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue*/, + const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& /*msg*/, + std::string& /*data*/, + uint64_t /*offset*/, + uint32_t /*length*/) { + throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent"); +/* checkInit(); uint64_t messageId (msg->getPersistenceId()); @@ -1181,10 +1254,13 @@ void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& queue, } else { THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!"); } +*/ } -void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue) +void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_) { + QLS_LOG(info, "*** MessageStoreImpl::flush() queue=\"" << queue_.getName() << "\""); +/* if (queue.getExternalQueueStore() == 0) return; checkInit(); std::string qn = queue.getName(); @@ -1192,17 +1268,20 @@ void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue) JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore()); if (jc) { // TODO: check if this result should be used... - /*mrg::journal::iores res =*/ jc->flush(); + mrg::journal::iores res = jc->flush(); } } catch (const qpid::qls_jrnl::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() ); } +*/ } -void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt, - const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, - const qpid::broker::PersistableQueue& queue) +void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* /*ctxt*/, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg_, + const qpid::broker::PersistableQueue& /*queue*/) { +// QLS_LOG(info, "*** MessageStoreImpl::enqueue() queue=\"" << queue.getName() << "\""); +/* checkInit(); uint64_t queueId (queue.getPersistenceId()); uint64_t messageId (msg->getPersistenceId()); @@ -1228,29 +1307,34 @@ void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt, // add queue* to the txn map.. if (ctxt) txn->addXidRecord(queue.getExternalQueueStore()); +*/ + msg_->enqueueComplete();// DEBUG: only while null fns in use } -uint64_t MessageStoreImpl::msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message) +uint64_t MessageStoreImpl::msgEncode(std::vector<char>& buff_, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message_) { - uint32_t headerSize = message->encodedHeaderSize(); - uint64_t size = message->encodedSize() + sizeof(uint32_t); - try { buff = std::vector<char>(size); } // long + headers + content + uint32_t headerSize = message_->encodedHeaderSize(); + uint64_t size = message_->encodedSize() + sizeof(uint32_t); + try { buff_ = std::vector<char>(size); } // long + headers + content catch (const std::exception& e) { std::ostringstream oss; oss << "Unable to allocate memory for encoding message; requested size: " << size << "; error: " << e.what(); THROW_STORE_EXCEPTION(oss.str()); } - qpid::framing::Buffer buffer(&buff[0],size); + qpid::framing::Buffer buffer(&buff_[0],size); buffer.putLong(headerSize); - message->encode(buffer); + message_->encode(buffer); return size; } -void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue, - TxnCtxt* txn, - const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message, - bool /*newId*/) +void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_, + TxnCtxt* /*txn*/, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*message*/, + bool /*newId*/) { + QLS_LOG(info, "*** MessageStoreImpl::store() queue=\"" << queue_->getName() << "\""); +/* std::vector<char> buff; uint64_t size = msgEncode(buff, message); @@ -1275,12 +1359,15 @@ void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue, THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": MessageStoreImpl::store() failed: " + e.what()); } +*/ } -void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt, - const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, - const qpid::broker::PersistableQueue& queue) +void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* /*ctxt*/, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg_, + const qpid::broker::PersistableQueue& /*queue*/) { +// QLS_LOG(info, "*** MessageStoreImpl::dequeue() queue=\"" << queue.getName() << "\""); +/* checkInit(); uint64_t queueId (queue.getPersistenceId()); uint64_t messageId (msg->getPersistenceId()); @@ -1302,14 +1389,16 @@ void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt, // add queue* to the txn map.. if (ctxt) txn->addXidRecord(queue.getExternalQueueStore()); async_dequeue(ctxt, msg, queue); - - msg->dequeueComplete(); +*/ + msg_->dequeueComplete(); } -void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt, - const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, - const qpid::broker::PersistableQueue& queue) +void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* /*ctxt*/, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/, + const qpid::broker::PersistableQueue& queue_) { + QLS_LOG(info, "*** MessageStoreImpl::async_dequeue() queue=\"" << queue_.getName() << "\""); +/* boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl); ddtokp->setSourceMessage(msg); ddtokp->set_external_rid(true); @@ -1334,38 +1423,39 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt, ddtokp->release(); THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what()); } +*/ } -uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/) +uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue_*/) { - checkInit(); +/* checkInit();*/ return 0; } -void MessageStoreImpl::completed(TxnCtxt& txn, - bool commit) +void MessageStoreImpl::completed(TxnCtxt& txn_, + bool commit_) { try { chkTplStoreInit(); // Late initialize (if needed) // Nothing to do if not prepared - if (txn.getDtok()->is_enqueued()) { - txn.incrDtokRef(); - DataTokenImpl* dtokp = txn.getDtok(); + if (txn_.getDtok()->is_enqueued()) { + txn_.incrDtokRef(); + DataTokenImpl* dtokp = txn_.getDtok(); dtokp->set_dequeue_rid(dtokp->rid()); dtokp->set_rid(messageIdSequence.next()); - tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit); + tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), commit_); } - txn.complete(commit); + txn_.complete(commit_); if (mgmtObject.get() != 0) { mgmtObject->dec_tplTransactionDepth(); - if (commit) + if (commit_) mgmtObject->inc_tplTxnCommits(); else mgmtObject->inc_tplTxnAborts(); } } catch (const std::exception& e) { - QLS_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what()); + QLS_LOG(error, "Error completing xid " << txn_.getXid() << ": " << e.what()); throw; } } @@ -1377,54 +1467,54 @@ std::auto_ptr<qpid::broker::TransactionContext> MessageStoreImpl::begin() return std::auto_ptr<qpid::broker::TransactionContext>(new TxnCtxt(&messageIdSequence)); } -std::auto_ptr<qpid::broker::TPCTransactionContext> MessageStoreImpl::begin(const std::string& xid) +std::auto_ptr<qpid::broker::TPCTransactionContext> MessageStoreImpl::begin(const std::string& xid_) { checkInit(); IdSequence* jtx = &messageIdSequence; // pass sequence number for c/a - return std::auto_ptr<qpid::broker::TPCTransactionContext>(new TPCTxnCtxt(xid, jtx)); + return std::auto_ptr<qpid::broker::TPCTransactionContext>(new TPCTxnCtxt(xid_, jtx)); } -void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt) +void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt_) { checkInit(); - TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt); + TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt_); if(!txn) throw qpid::broker::InvalidTransactionContextException(); localPrepare(txn); } -void MessageStoreImpl::localPrepare(TxnCtxt* ctxt) +void MessageStoreImpl::localPrepare(TxnCtxt* ctxt_) { try { chkTplStoreInit(); // Late initialize (if needed) // This sync is required to ensure multi-queue atomicity - ie all txn data // must hit the disk on *all* queues before the TPL prepare (enq) is written. - ctxt->sync(); + ctxt_->sync(); - ctxt->incrDtokRef(); - DataTokenImpl* dtokp = ctxt->getDtok(); + ctxt_->incrDtokRef(); + DataTokenImpl* dtokp = ctxt_->getDtok(); dtokp->set_external_rid(true); dtokp->set_rid(messageIdSequence.next()); - char tpcFlag = static_cast<char>(ctxt->isTPC()); - tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt->getXid(), false); - ctxt->prepare(tplStorePtr.get()); + char tpcFlag = static_cast<char>(ctxt_->isTPC()); + tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), false); + ctxt_->prepare(tplStorePtr.get()); // make sure all the data is written to disk before returning - ctxt->sync(); + ctxt_->sync(); if (mgmtObject.get() != 0) { mgmtObject->inc_tplTransactionDepth(); mgmtObject->inc_tplTxnPrepares(); } } catch (const std::exception& e) { - QLS_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what()); + QLS_LOG(error, "Error preparing xid " << ctxt_->getXid() << ": " << e.what()); throw; } } -void MessageStoreImpl::commit(qpid::broker::TransactionContext& ctxt) +void MessageStoreImpl::commit(qpid::broker::TransactionContext& ctxt_) { checkInit(); - TxnCtxt* txn(check(&ctxt)); + TxnCtxt* txn(check(&ctxt_)); if (!txn->isTPC()) { if (txn->impactedQueuesEmpty()) return; localPrepare(dynamic_cast<TxnCtxt*>(txn)); @@ -1432,10 +1522,10 @@ void MessageStoreImpl::commit(qpid::broker::TransactionContext& ctxt) completed(*dynamic_cast<TxnCtxt*>(txn), true); } -void MessageStoreImpl::abort(qpid::broker::TransactionContext& ctxt) +void MessageStoreImpl::abort(qpid::broker::TransactionContext& ctxt_) { checkInit(); - TxnCtxt* txn(check(&ctxt)); + TxnCtxt* txn(check(&ctxt_)); if (!txn->isTPC()) { if (txn->impactedQueuesEmpty()) return; localPrepare(dynamic_cast<TxnCtxt*>(txn)); @@ -1443,20 +1533,20 @@ void MessageStoreImpl::abort(qpid::broker::TransactionContext& ctxt) completed(*dynamic_cast<TxnCtxt*>(txn), false); } -TxnCtxt* MessageStoreImpl::check(qpid::broker::TransactionContext* ctxt) +TxnCtxt* MessageStoreImpl::check(qpid::broker::TransactionContext* ctxt_) { - TxnCtxt* txn = dynamic_cast<TxnCtxt*>(ctxt); + TxnCtxt* txn = dynamic_cast<TxnCtxt*>(ctxt_); if(!txn) throw qpid::broker::InvalidTransactionContextException(); return txn; } -void MessageStoreImpl::put(db_ptr db, - DbTxn* txn, - Dbt& key, - Dbt& value) +void MessageStoreImpl::put(db_ptr db_, + DbTxn* txn_, + Dbt& key_, + Dbt& value_) { try { - int status = db->put(txn, &key, &value, DB_NODUPDATA); + int status = db_->put(txn_, &key_, &value_, DB_NODUPDATA); if (status == DB_KEYEXIST) { THROW_STORE_EXCEPTION("duplicate data"); } else if (status) { @@ -1467,7 +1557,7 @@ void MessageStoreImpl::put(db_ptr db, } } -void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue) +void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue_) { TxnCtxt txn; txn.begin(dbenv.get(), true); @@ -1484,9 +1574,9 @@ void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQue THROW_STORE_EXCEPTION("Not enough data for binding"); } uint64_t queueId = buffer.getLongLong(); - if (queue.getPersistenceId() == queueId) { + if (queue_.getPersistenceId() == queueId) { bindings->del(0); - QLS_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId); + QLS_LOG(debug, "Deleting binding for " << queue_.getName() << " " << key.id << "->" << queueId); } } } @@ -1498,12 +1588,12 @@ void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQue txn.abort(); throw; } - QLS_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId()); + QLS_LOG(debug, "Deleted all bindings for " << queue_.getName() << ":" << queue_.getPersistenceId()); } -void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& exchange, - const qpid::broker::PersistableQueue& queue, - const std::string& bkey) +void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& exchange_, + const qpid::broker::PersistableQueue& queue_, + const std::string& bkey_) { TxnCtxt txn; txn.begin(dbenv.get(), true); @@ -1512,7 +1602,7 @@ void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& ex Cursor bindings; bindings.open(bindingDb, txn.get()); - IdDbt key(exchange.getPersistenceId()); + IdDbt key(exchange_.getPersistenceId()); Dbt value; for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) { @@ -1521,14 +1611,14 @@ void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& ex THROW_STORE_EXCEPTION("Not enough data for binding"); } uint64_t queueId = buffer.getLongLong(); - if (queue.getPersistenceId() == queueId) { + if (queue_.getPersistenceId() == queueId) { std::string q; std::string k; buffer.getShortString(q); buffer.getShortString(k); - if (bkey == k) { + if (bkey_ == k) { bindings->del(0); - QLS_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId); + QLS_LOG(debug, "Deleting binding for " << queue_.getName() << " " << key.id << "->" << queueId); } } } @@ -1543,6 +1633,13 @@ void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& ex } } +std::string MessageStoreImpl::getStoreTopLevelDir() { + std::ostringstream dir; + dir << storeDir << "/" << storeTopLevelDir; + return dir.str(); +} + + std::string MessageStoreImpl::getJrnlBaseDir() { std::ostringstream dir; @@ -1564,43 +1661,28 @@ std::string MessageStoreImpl::getTplBaseDir() return dir.str(); } -std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/ +std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& queue_) //for exmaple /var/rhm/ + queueDir/ { - return getJrnlHashDir(queue.getName().c_str()); -} - -uint32_t MessageStoreImpl::bHash(const std::string str) -{ - // Daniel Bernstein hash fn - uint32_t h = 0; - for (std::string::const_iterator i = str.begin(); i < str.end(); i++) - h = 33*h + *i; - return h; -} - -std::string MessageStoreImpl::getJrnlHashDir(const std::string& queueName) //for exmaple /var/rhm/ + queueDir/ -{ - std::stringstream dir; - dir << getJrnlBaseDir() << std::hex << std::setfill('0') << std::setw(4); - dir << (bHash(queueName.c_str()) % 29); // Use a prime number for better distribution across dirs - dir << "/" << queueName << "/"; - return dir.str(); + /*return getJrnlHashDir(queue_.getName().c_str());*/ + std::ostringstream oss; + oss << getJrnlBaseDir() << queue_.getName(); + return oss.str(); } std::string MessageStoreImpl::getStoreDir() const { return storeDir; } -void MessageStoreImpl::journalDeleted(JournalImpl& j) { +void MessageStoreImpl::journalDeleted(JournalImpl& j_) { qpid::sys::Mutex::ScopedLock sl(journalListLock); - journalList.erase(j.id()); + journalList.erase(j_.id()); } -MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) : - qpid::Options(name), +MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) : + qpid::Options(name_), truncateFlag(defTruncateFlag), wCachePageSizeKib(defWCachePageSize), tplWCachePageSizeKib(defTplWCachePageSize), efpPartition(defEfpPartition), - efpFileSize(defEfpFileSize) + efpFileSizeKib(defEfpFileSizeKib) { addOptions() ("store-dir", qpid::optValue(storeDir, "DIR"), @@ -1619,8 +1701,8 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) : "Lower values decrease latency at the expense of throughput.") ("efp-partition", qpid::optValue(efpPartition, "N"), "Empty File Pool partition to use for finding empty journal files") - ("efp-file-size", qpid::optValue(efpFileSize, "N"), - "Empty File Pool file size to use for journal files") + ("efp-file-size", qpid::optValue(efpFileSizeKib, "N"), + "Empty File Pool file size in KiB to use for journal files. Must be a multiple of 4 KiB.") ; } diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h index 5d5fa28ff8..8d88e9da97 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h @@ -26,10 +26,12 @@ #include "db-inc.h" #include "qpid/linearstore/Cursor.h" +#include "qpid/linearstore/EmptyFilePoolManagerImpl.h" #include "qpid/linearstore/IdDbt.h" #include "qpid/linearstore/IdSequence.h" #include "qpid/linearstore/JournalImpl.h" #include "qpid/linearstore/jrnl/jcfg.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" #include "qpid/linearstore/PreparedTransaction.h" #include "qpid/broker/Broker.h" #include "qpid/broker/MessageStore.h" @@ -48,6 +50,9 @@ class Timer; }} namespace qpid{ +namespace qls_jrnl { +class EmptyFilePoolManager; +} namespace linearstore{ /** @@ -67,7 +72,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem uint32_t wCachePageSizeKib; uint32_t tplWCachePageSizeKib; uint16_t efpPartition; - uint64_t efpFileSize; + uint64_t efpFileSizeKib; }; protected: @@ -98,10 +103,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem static const bool defTruncateFlag = false; static const uint32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024; static const uint32_t defTplWCachePageSize = defWCachePageSize / 8; - static const uint16_t defEfpPartition = 0; - static const uint64_t defEfpFileSize = 512 * JRNL_SBLK_SIZE; - + static const uint16_t defEfpPartition = 1; + static const uint64_t defEfpFileSizeKib = 512 * JRNL_SBLK_SIZE / 1024; static const std::string storeTopLevelDir; + static qpid::sys::Duration defJournalGetEventsTimeout; static qpid::sys::Duration defJournalFlushTimeout; @@ -127,21 +132,18 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem IdSequence generalIdSequence; IdSequence messageIdSequence; std::string storeDir; - uint16_t numJrnlFiles; - bool autoJrnlExpand; - uint16_t autoJrnlExpandMaxFiles; - uint32_t jrnlFsizeSblks; + qpid::qls_jrnl::efpPartitionNumber_t defaultEfpPartitionNumber; + qpid::qls_jrnl::efpFileSizeKib_t defaultEfpFileSizeKib; bool truncateFlag; uint32_t wCachePgSizeSblks; uint16_t wCacheNumPages; - uint16_t tplNumJrnlFiles; - uint32_t tplJrnlFsizeSblks; uint32_t tplWCachePgSizeSblks; uint16_t tplWCacheNumPages; uint64_t highestRid; bool isInit; const char* envPath; qpid::broker::Broker* broker; + boost::shared_ptr<EmptyFilePoolManagerImpl> efpMgr; qmf::org::apache::qpid::linearstore::Store::shared_ptr mgmtObject; qpid::management::ManagementAgent* agent; @@ -149,9 +151,13 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem // Parameter validation and calculation static uint32_t chkJrnlWrPageCacheSize(const uint32_t param, - const std::string paramName/*, - const uint16_t jrnlFsizePgs*/); - static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKib); + const std::string& paramName/*, + const uint16_t jrnlFsizePgs*/); + static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKiB); + static qpid::qls_jrnl::efpPartitionNumber_t chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition, + const std::string& paramName); + static qpid::qls_jrnl::efpFileSizeKib_t chkEfpFileSizeKiB(const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKiB, + const std::string& paramName); void init(); @@ -225,9 +231,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem // journal functions void createJrnlQueue(const qpid::broker::PersistableQueue& queue); - uint32_t bHash(const std::string str); std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/ - std::string getJrnlHashDir(const std::string& queueName); + qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t p, const qpid::qls_jrnl::efpFileSizeKib_t s); + qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::framing::FieldTable& args); + std::string getStoreTopLevelDir(); std::string getJrnlBaseDir(); std::string getBdbBaseDir(); std::string getTplBaseDir(); @@ -260,11 +267,13 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem bool init(const qpid::Options* options); bool init(const std::string& dir, + qpid::qls_jrnl::efpPartitionNumber_t efpPartition = defEfpPartition, + qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib = defEfpFileSizeKib, const bool truncateFlag = false, uint32_t wCachePageSize = defWCachePageSize, uint32_t tplWCachePageSize = defTplWCachePageSize); - void truncateInit(const bool saveStoreContent = false); + void truncateInit(); void initManagement (); diff --git a/qpid/cpp/src/qpid/linearstore/Log.h b/qpid/cpp/src/qpid/linearstore/QpidLog.h index b03ea7ac9d..b03ea7ac9d 100644 --- a/qpid/cpp/src/qpid/linearstore/Log.h +++ b/qpid/cpp/src/qpid/linearstore/QpidLog.h diff --git a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp index 840468d8bd..bf574d9740 100644 --- a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp +++ b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp @@ -23,7 +23,7 @@ #include "qpid/Plugin.h" #include "qpid/Options.h" #include "qpid/DataDir.h" -#include "qpid/linearstore/Log.h" +#include "qpid/linearstore/QpidLog.h" #include "qpid/linearstore/MessageStoreImpl.h" using qpid::linearstore::MessageStoreImpl; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp new file mode 100644 index 0000000000..90e04df7ed --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp @@ -0,0 +1,255 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "EmptyFilePool.h" + +#include <cctype> +#include <fstream> +#include "qpid/linearstore/jrnl/jcfg.h" +#include "qpid/linearstore/jrnl/jdir.h" +#include "qpid/linearstore/jrnl/JournalFile.h" +#include "qpid/linearstore/jrnl/slock.h" +#include "qpid/linearstore/jrnl/utils/file_hdr.h" +#include <sys/stat.h> +#include <uuid/uuid.h> +#include <vector> + +#include <iostream> // DEBUG + +namespace qpid { +namespace qls_jrnl { + +EmptyFilePool::EmptyFilePool(const std::string& efpDirectory_, + const EmptyFilePoolPartition* partitionPtr_) : + efpDirectory(efpDirectory_), + efpFileSizeKib(fileSizeKbFromDirName(efpDirectory_, partitionPtr_->partitionNumber())), + partitionPtr(partitionPtr_) +{} + +EmptyFilePool::~EmptyFilePool() {} + +void +EmptyFilePool::initialize() { + //std::cout << "Reading " << efpDirectory << std::endl; // DEBUG + std::vector<std::string> dirList; + jdir::read_dir(efpDirectory, dirList, false, true, false); + for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { + size_t dotPos = i->rfind("."); + if (dotPos != std::string::npos) { + if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) { + std::string emptyFile(efpDirectory + "/" + (*i)); + if (validateEmptyFile(emptyFile)) { + pushEmptyFile(emptyFile); + } + } + } + } + //std::cout << "Found " << emptyFileList.size() << " files" << std::endl; // DEBUG +} + +efpFileSizeKib_t +EmptyFilePool::fileSizeKib() const { + return efpFileSizeKib; +} + +efpFileCount_t +EmptyFilePool::numEmptyFiles() const { + slock l(emptyFileListMutex); + return efpFileCount_t(emptyFileList.size()); +} + +efpFileSizeKib_t +EmptyFilePool::cumFileSizeKib() const { + slock l(emptyFileListMutex); + return efpFileSizeKib_t(emptyFileList.size()) * efpFileSizeKib; +} + +efpPartitionNumber_t +EmptyFilePool::getPartitionNumber() const { + return partitionPtr->partitionNumber(); +} + +const EmptyFilePoolPartition* +EmptyFilePool::getPartition() const { + return partitionPtr; +} + +const efpIdentity_t +EmptyFilePool::getIdentity() const { + return efpIdentity_t(partitionPtr->partitionNumber(), efpFileSizeKib); +} + +std::string +EmptyFilePool::takeEmptyFile(const std::string& destDirectory) { + std::string emptyFileName = popEmptyFile(); + std::string newFileName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' + if (::rename(emptyFileName.c_str(), newFileName.c_str())) { + pushEmptyFile(emptyFileName); + std::ostringstream oss; + oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile"); + } + return newFileName; +} + +bool +EmptyFilePool::returnEmptyFile(const JournalFile* srcFile) { + std::string emptyFileName(efpDirectory + srcFile->fileName()); + // TODO: reset file here + if (::rename(srcFile->fqFileName().c_str(), emptyFileName.c_str())) { + std::ostringstream oss; + oss << "file=\"" << srcFile << "\" dest=\"" << emptyFileName << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile"); + } + pushEmptyFile(emptyFileName); + return true; +} + +// protected + +void +EmptyFilePool::pushEmptyFile(const std::string fqFileName_) { + slock l(emptyFileListMutex); + emptyFileList.push_back(fqFileName_); +} + +std::string +EmptyFilePool::popEmptyFile() { + std::string emptyFileName; + bool isEmpty = false; + { + slock l(emptyFileListMutex); + isEmpty = emptyFileList.empty(); + } + if (isEmpty) { + createEmptyFile(); + } + { + slock l(emptyFileListMutex); + emptyFileName = emptyFileList.front(); + emptyFileList.pop_front(); + } + return emptyFileName; +} + +void +EmptyFilePool::createEmptyFile() { + file_hdr_t fh; + ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDRSIZESBLKS, partitionPtr->partitionNumber(), + efpFileSizeKib); + std::string efpfn = getEfpFileName(); + std::ofstream ofs(efpfn.c_str(), std::ofstream::out | std::ofstream::binary); + if (ofs.good()) { + ofs.write((char*)&fh, sizeof(file_hdr_t)); + uint64_t rem = ((efpFileSizeKib + (QLS_JRNL_FHDRSIZESBLKS * JRNL_SBLK_SIZE_KIB)) * 1024) - sizeof(file_hdr_t); + while (rem--) + ofs.put('\0'); + ofs.close(); + pushEmptyFile(efpfn); + std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << + efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl; + } else { + std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG + } +} + +bool +EmptyFilePool::validateEmptyFile(const std::string& emptyFileName_) const { + struct stat s; + if (::stat(emptyFileName_.c_str(), &s)) + { + std::ostringstream oss; + oss << "stat: file=\"" << emptyFileName_ << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "EmptyFilePool", "validateEmptyFile"); + } + efpFileSizeKib_t expectedSize = (JRNL_SBLK_SIZE_KIB + efpFileSizeKib) * 1024; + if ((efpFileSizeKib_t)s.st_size != expectedSize) { + //std::cout << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size << std::endl; // DEBUG + return false; + } + + std::ifstream ifs(emptyFileName_.c_str(), std::ifstream::in | std::ifstream::binary); + if (!ifs) { + //std::cout << "ERROR: File " << emptyFileName << ": Unable to open for reading" << std::endl; + return false; + } + + const uint8_t fhFileNameBuffLen = 50; + char fhFileNameBuff[fhFileNameBuffLen]; + file_hdr_t fh; + ifs.read((char*)&fh, sizeof(file_hdr_t)); + uint16_t fhFileNameLen = fh._queue_name_len > fhFileNameBuffLen ? fhFileNameBuffLen : fh._queue_name_len; + ifs.read(fhFileNameBuff, fhFileNameLen); + std::string fhFileName(fhFileNameBuff, fhFileNameLen); + ifs.close(); + + if (fh._rhdr._magic != QLS_FILE_MAGIC || + fh._rhdr._version != QLS_JRNL_VERSION || + fh._efp_partition != partitionPtr->partitionNumber() || + fh._file_size_kib != efpFileSizeKib || + !::is_file_hdr_reset(&fh)) + { + //std::cout << "ERROR: File " << emptyFileName << ": Invalid file header" << std::endl; + return false; + } + + return true; +} + +std::string +EmptyFilePool::getEfpFileName() { + uuid_t uuid; + ::uuid_generate(uuid); // NOTE: NOT THREAD SAFE + char uuid_str[37]; // 36 char uuid + trailing \0 + ::uuid_unparse(uuid, uuid_str); + std::ostringstream oss; + oss << efpDirectory << "/" << uuid_str << QLS_JRNL_FILE_EXTENSION; + return oss.str(); +} + +// protected +// static +efpFileSizeKib_t +EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName_, + const efpPartitionNumber_t partitionNumber_) { + // Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0. + std::string n(dirName_.substr(dirName_.rfind('/')+1)); + bool valid = true; + for (uint16_t charNum = 0; charNum < n.length(); ++charNum) { + if (charNum < n.length()-1) { + if (!::isdigit((int)n[charNum])) { + valid = false; + break; + } + } else { + valid = n[charNum] == 'k'; + } + } + efpFileSizeKib_t s = ::atol(n.c_str()); + if (!valid || s == 0 || s % JRNL_SBLK_SIZE_KIB != 0) { + std::ostringstream oss; + oss << "Partition: " << partitionNumber_ << "; EFP directory: \'" << n << "\'"; + throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName"); + } + return s; +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h new file mode 100644 index 0000000000..1958bb7647 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_QLS_JRNL_EMPTYFILEPOOL_H_ +#define QPID_QLS_JRNL_EMPTYFILEPOOL_H_ + +namespace qpid { +namespace qls_jrnl { + + class EmptyFilePool; + +}} // namespace qpid::qls_jrnl + +#include <deque> +#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" +#include "qpid/linearstore/jrnl/smutex.h" +#include <string> + +namespace qpid { +namespace qls_jrnl { +class jdir; +class JournalFile; + +class EmptyFilePool +{ +protected: + typedef std::deque<std::string> emptyFileList_t; + typedef emptyFileList_t::iterator emptyFileListItr_t; + + const std::string efpDirectory; + const efpFileSizeKib_t efpFileSizeKib; + const EmptyFilePoolPartition* partitionPtr; + +private: + emptyFileList_t emptyFileList; + smutex emptyFileListMutex; + +public: + EmptyFilePool(const std::string& efpDirectory_, + const EmptyFilePoolPartition* partitionPtr_); + virtual ~EmptyFilePool(); + + void initialize(); + efpFileSizeKib_t fileSizeKib() const; + efpFileCount_t numEmptyFiles() const; + efpFileSizeKib_t cumFileSizeKib() const; + efpPartitionNumber_t getPartitionNumber() const; + const EmptyFilePoolPartition* getPartition() const; + const efpIdentity_t getIdentity() const; + + std::string takeEmptyFile(const std::string& destDirectory_); + bool returnEmptyFile(const JournalFile* srcFile_); + +protected: + void pushEmptyFile(const std::string fqFileName_); + std::string popEmptyFile(); + void createEmptyFile(); + bool validateEmptyFile(const std::string& emptyFileName_) const; + std::string getEfpFileName(); + static efpFileSizeKib_t fileSizeKbFromDirName(const std::string& dirName_, + const efpPartitionNumber_t partitionNumber_); +}; + +}} // namespace qpid::qls_jrnl + +#endif /* QPID_QLS_JRNL_EMPTYFILEPOOL_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp new file mode 100644 index 0000000000..1bc716f912 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp @@ -0,0 +1,175 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "EmptyFilePoolManager.h" + +#include <dirent.h> +#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h" +#include "qpid/linearstore/jrnl/jdir.h" +#include "qpid/linearstore/jrnl/slock.h" +#include <vector> + +// DEBUG +//#include <iostream> + +namespace qpid { +namespace qls_jrnl { + +EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath_) : + qlsStorePath(qlsStorePath_) +{} + +EmptyFilePoolManager::~EmptyFilePoolManager() { + slock l(partitionMapMutex); + for (partitionMapItr_t i = partitionMap.begin(); i != partitionMap.end(); ++i) { + delete i->second; + } + partitionMap.clear(); +} + +void +EmptyFilePoolManager::findEfpPartitions() { + //std::cout << "*** Reading " << qlsStorePath << std::endl; // DEBUG + std::vector<std::string> dirList; + jdir::read_dir(qlsStorePath, dirList, true, false, true); + for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { + if ((*i)[0] == 'p' && i->length() == 4) { // Filter: look only at names pNNN + efpPartitionNumber_t pn = ::atoi(i->c_str() + 1); + std::string fullDirPath(qlsStorePath + "/" + (*i)); + EmptyFilePoolPartition* efppp = 0; + try { + efppp = new EmptyFilePoolPartition(pn, fullDirPath); + { + slock l(partitionMapMutex); + partitionMap[pn] = efppp; + } + } catch (const std::exception& e) { + if (efppp != 0) { + delete efppp; + efppp = 0; + } + //std::cerr << "Unable to initialize partition " << pn << " (\'" << fullDirPath << "\'): " << e.what() << std::endl; + } + if (efppp != 0) + efppp->findEmptyFilePools(); + } + } +} + +uint16_t +EmptyFilePoolManager::getNumEfpPartitions() const { + return partitionMap.size(); +} + +EmptyFilePoolPartition* +EmptyFilePoolManager::getEfpPartition(const efpPartitionNumber_t partitionNumber) { + partitionMapItr_t i = partitionMap.find(partitionNumber); + if (i == partitionMap.end()) + return 0; + else + return i->second; +} + +void +EmptyFilePoolManager::getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList, + const efpFileSizeKib_t efpFileSizeKb) const { + slock l(partitionMapMutex); + for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) { + if (efpFileSizeKb == 0) { + partitionNumberList.push_back(i->first); + } else { + std::vector<efpFileSizeKib_t> efpFileSizeList; + i->second->getEmptyFilePoolSizesKb(efpFileSizeList); + for (std::vector<efpFileSizeKib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) { + if (*j == efpFileSizeKb) { + partitionNumberList.push_back(i->first); + break; + } + } + } + } +} + +void +EmptyFilePoolManager::getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList, + const efpFileSizeKib_t efpFileSizeKb) { + slock l(partitionMapMutex); + for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) { + if (efpFileSizeKb == 0) { + partitionList.push_back(i->second); + } else { + std::vector<efpFileSizeKib_t> efpFileSizeList; + i->second->getEmptyFilePoolSizesKb(efpFileSizeList); + for (std::vector<efpFileSizeKib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) { + if (*j == efpFileSizeKb) { + partitionList.push_back(i->second); + break; + } + } + } + } +} + +void +EmptyFilePoolManager::getEfpFileSizes(std::vector<efpFileSizeKib_t>& efpFileSizeList, + const efpPartitionNumber_t efpPartitionNumber) const { + if (efpPartitionNumber == 0) { + for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) { + i->second->getEmptyFilePoolSizesKb(efpFileSizeList); + } + } else { + partitionMapConstItr_t i = partitionMap.find(efpPartitionNumber); + if (i != partitionMap.end()) { + i->second->getEmptyFilePoolSizesKb(efpFileSizeList); + } + } +} + +void +EmptyFilePoolManager::getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList, + const efpPartitionNumber_t efpPartitionNumber) { + if (efpPartitionNumber == 0) { + for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) { + i->second->getEmptyFilePools(emptyFilePoolList); + } + } else { + partitionMapConstItr_t i = partitionMap.find(efpPartitionNumber); + if (i != partitionMap.end()) { + i->second->getEmptyFilePools(emptyFilePoolList); + } + } +} + +EmptyFilePool* +EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber, + const efpFileSizeKib_t efpFileSizeKib) { + EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber); + if (efppp != 0) + return efppp->getEmptyFilePool(efpFileSizeKib); + return 0; +} + +EmptyFilePool* +EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIdentity) { + return getEmptyFilePool(efpIdentity.first, efpIdentity.second); +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h new file mode 100644 index 0000000000..e34eb2c0f3 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLMANAGER_H_ +#define QPID_QLS_JRNL_EMPTYFILEPOOLMANAGER_H_ + +#include <map> +#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h" +#include "qpid/linearstore/jrnl/smutex.h" +#include <string> + +namespace qpid { +namespace qls_jrnl { + +class EmptyFilePoolManager +{ +protected: + typedef std::map<efpPartitionNumber_t, EmptyFilePoolPartition*> partitionMap_t; + typedef partitionMap_t::iterator partitionMapItr_t; + typedef partitionMap_t::const_iterator partitionMapConstItr_t; + + std::string qlsStorePath; + partitionMap_t partitionMap; + smutex partitionMapMutex; + +public: + EmptyFilePoolManager(const std::string& qlsStorePath_); + virtual ~EmptyFilePoolManager(); + void findEfpPartitions(); + + uint16_t getNumEfpPartitions() const; + EmptyFilePoolPartition* getEfpPartition(const efpPartitionNumber_t partitionNumber); + void getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList, const efpFileSizeKib_t efpFileSizeKb = 0) const; + void getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList, const efpFileSizeKib_t efpFileSizeKb = 0); + + void getEfpFileSizes(std::vector<efpFileSizeKib_t>& efpFileSizeList, const efpPartitionNumber_t efpPartitionNumber = 0) const; + void getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList, const efpPartitionNumber_t efpPartitionNumber = 0); + + EmptyFilePool* getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpFileSizeKib_t efpFileSizeKb); + EmptyFilePool* getEmptyFilePool(const efpIdentity_t efpIdentity); +}; + +}} // namespace qpid::qls_jrnl + +#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLMANAGER_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp new file mode 100644 index 0000000000..6e6bbc4abd --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp @@ -0,0 +1,134 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h" + +#include <dirent.h> +#include "qpid/linearstore/jrnl/jdir.h" +#include "qpid/linearstore/jrnl/jerrno.h" +#include "qpid/linearstore/jrnl/jexception.h" +#include "qpid/linearstore/jrnl/slock.h" + +//#include <iostream> // DEBUG + +namespace qpid { +namespace qls_jrnl { + +const std::string EmptyFilePoolPartition::efpTopLevelDir("efp"); // Sets the top-level efp dir within a partition + +EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum_, const std::string& partitionDir_) : + partitionNum(partitionNum_), + partitionDir(partitionDir_) +{ + validatePartitionDir(); +} + +EmptyFilePoolPartition::~EmptyFilePoolPartition() { + slock l(efpMapMutex); + for (efpMapItr_t i = efpMap.begin(); i != efpMap.end(); ++i) { + delete i->second; + } + efpMap.clear(); +} + +void +EmptyFilePoolPartition::validatePartitionDir() { + if (!jdir::is_dir(partitionDir)) { + std::ostringstream ss; + ss << "Invalid partition directory: \'" << partitionDir << "\' is not a directory"; + throw jexception(jerrno::JERR_EFP_BADPARTITIONDIR, ss.str(), "EmptyFilePoolPartition", "validatePartitionDir"); + } + // TODO: other validity checks here +} + +void +EmptyFilePoolPartition::findEmptyFilePools() { + //std::cout << "Reading " << partitionDir << std::endl; // DEBUG + std::vector<std::string> dirList; + jdir::read_dir(partitionDir, dirList, true, false, false); + bool foundEfpDir = false; + for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { + if (i->compare(efpTopLevelDir) == 0) { + foundEfpDir = true; + break; + } + } + if (foundEfpDir) { + std::string efpDir(partitionDir + "/" + efpTopLevelDir); + //std::cout << "Reading " << efpDir << std::endl; // DEBUG + dirList.clear(); + jdir::read_dir(efpDir, dirList, true, false, false); + for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { + std::string efpSizeDir(efpDir + "/" + (*i)); + EmptyFilePool* efpp = 0; + try { + efpp = new EmptyFilePool(efpSizeDir, this); + { + slock l(efpMapMutex); + efpMap[efpp->fileSizeKib()] = efpp; + } + } + catch (const std::exception& e) { + if (efpp != 0) { + delete efpp; + efpp = 0; + } + //std::cerr << "WARNING: " << e.what() << std::endl; + } + if (efpp != 0) + efpp->initialize(); + } + } +} + +efpPartitionNumber_t +EmptyFilePoolPartition::partitionNumber() const { + return partitionNum; +} + +std::string +EmptyFilePoolPartition::partitionDirectory() const { + return partitionDir; +} + +EmptyFilePool* +EmptyFilePoolPartition::getEmptyFilePool(const efpFileSizeKib_t efpFileSizeKb) { + efpMapItr_t i = efpMap.find(efpFileSizeKb); + if (i == efpMap.end()) + return 0; + return i->second; +} + +void +EmptyFilePoolPartition::getEmptyFilePoolSizesKb(std::vector<efpFileSizeKib_t>& efpFileSizesKbList) const { + for (efpMapConstItr_t i=efpMap.begin(); i!=efpMap.end(); ++i) { + efpFileSizesKbList.push_back(i->first); + } +} + +void +EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) { + for (efpMapItr_t i=efpMap.begin(); i!=efpMap.end(); ++i) { + efpList.push_back(i->second); + } +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h new file mode 100644 index 0000000000..2013884b38 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_ +#define QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_ + +namespace qpid { +namespace qls_jrnl { + + class EmptyFilePoolPartition; + +}} // namespace qpid::qls_jrnl + +#include "qpid/linearstore/jrnl/EmptyFilePool.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" +#include "qpid/linearstore/jrnl/smutex.h" +#include <string> +#include <map> +#include <vector> + +namespace qpid { +namespace qls_jrnl { + +class EmptyFilePoolPartition +{ +public: + static const std::string efpTopLevelDir; +protected: + typedef std::map<efpFileSizeKib_t, EmptyFilePool*> efpMap_t; + typedef efpMap_t::iterator efpMapItr_t; + typedef efpMap_t::const_iterator efpMapConstItr_t; + + const efpPartitionNumber_t partitionNum; + const std::string partitionDir; + efpMap_t efpMap; + smutex efpMapMutex; + + void validatePartitionDir(); + +public: + EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum_, const std::string& partitionDir_); + virtual ~EmptyFilePoolPartition(); + void findEmptyFilePools(); + efpPartitionNumber_t partitionNumber() const; + std::string partitionDirectory() const; + + EmptyFilePool* getEmptyFilePool(const efpFileSizeKib_t efpFileSizeKb); + void getEmptyFilePoolSizesKb(std::vector<efpFileSizeKib_t>& efpFileSizesKbList) const; + void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList); +}; + +}} // namespace qpid::qls_jrnl + +#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h new file mode 100644 index 0000000000..de91bdc06a --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h @@ -0,0 +1,37 @@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ +#define QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ + +#include <stdint.h> + +namespace qpid { +namespace qls_jrnl { + + typedef uint64_t efpFileSizeKib_t; + typedef uint32_t efpFileCount_t; + typedef uint16_t efpPartitionNumber_t; + typedef std::pair<efpPartitionNumber_t, efpFileSizeKib_t> efpIdentity_t; + +}} // namespace qpid::qls_jrnl + +#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp new file mode 100644 index 0000000000..87ecdfb7a8 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/linearstore/jrnl/JournalFile.h" + +namespace qpid { +namespace qls_jrnl { + +JournalFile::JournalFile(const std::string& fqFileName_) : + fqfn(fqFileName_) +{} + +JournalFile::~JournalFile() {} + +const std::string +JournalFile::directory() const { + return fqfn.substr(0, fqfn.rfind('/')); +} + +const std::string +JournalFile::fileName() const { + return fqfn.substr(fqfn.rfind('/')); +} + +const std::string +JournalFile::fqFileName() const { + return fqfn; +} + +bool +JournalFile::empty() const { + // TODO: return true if no still-enqueued records (or parts of records) exist in this file + return true; +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h new file mode 100644 index 0000000000..de587d94e3 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LINEARSTORE_JOURNALFILE_H_ +#define QPID_LINEARSTORE_JOURNALFILE_H_ + +#include <string> + +namespace qpid { +namespace qls_jrnl { + +class JournalFile +{ +protected: + const std::string fqfn; +public: + JournalFile(const std::string& fqFileName_); + virtual ~JournalFile(); + + const std::string directory() const; + const std::string fileName() const; + const std::string fqFileName() const; + bool empty() const; +}; + +}} // namespace qpid::qls_jrnl + +#endif // QPID_LINEARSTORE_JOURNALFILE_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp new file mode 100644 index 0000000000..6167a641c9 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp @@ -0,0 +1,142 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/linearstore/jrnl/JournalFileController.h" + +#include <fstream> +#include "qpid/linearstore/jrnl/EmptyFilePool.h" +#include "qpid/linearstore/jrnl/jcfg.h" +#include "qpid/linearstore/jrnl/JournalFile.h" +#include "qpid/linearstore/jrnl/slock.h" +#include "qpid/linearstore/jrnl/utils/file_hdr.h" + +#include <iostream> // DEBUG + +namespace qpid { +namespace qls_jrnl { + +JournalFileController::JournalFileController(const std::string& dir_, + EmptyFilePool* efpp_) : + dir(dir_), + efpp(efpp_), + fileSeqCounter(0) +{ + //std::cout << "*** JournalFileController::JournalFileController() dir=" << dir << std::endl; +} + +JournalFileController::~JournalFileController() {} + +void +JournalFileController::pullEmptyFileFromEfp(const uint64_t recId_, + const uint64_t firstRecOffs_, + const std::string& queueName_) { + std::string ef = efpp->takeEmptyFile(dir); + //std::cout << "*** JournalFileController::pullEmptyFileFromEfp() qn=" << queueName_ << " ef=" << ef << std::endl; + const JournalFile* jfp = new JournalFile(ef/*efpp->takeEmptyFile(dir)*/); + initialzeFileHeader(jfp->fqFileName(), recId_, firstRecOffs_, getNextFileSeqNum(), queueName_); + { + slock l(journalFileListMutex); + journalFileList.push_back(jfp); + } +} + +void +JournalFileController::purgeFilesToEfp() { + slock l(journalFileListMutex); + while (journalFileList.front()->empty()) { + + efpp->returnEmptyFile(journalFileList.front()); + delete journalFileList.front(); + journalFileList.pop_front(); + } +} + +void +JournalFileController::finalize() {} + +void +JournalFileController::setFileSeqNum(const uint64_t fileSeqNum) { + fileSeqCounter = fileSeqNum; +} + +// protected + +std::string +JournalFileController::readFileHeader(file_hdr_t* fhdr_, + const std::string& fileName_) { + //std::cout << "*** JournalFileController::readFileHeader() fn=" << fileName_ << std::endl; + char buff[JRNL_SBLK_SIZE]; + std::ifstream ifs(fileName_.c_str(), std::ifstream::in | std::ifstream::binary); + if (ifs.good()) { + ifs.read(buff, JRNL_SBLK_SIZE); + ifs.close(); + std::memcpy(fhdr_, buff, sizeof(file_hdr_t)); + return std::string(buff + sizeof(file_hdr_t), fhdr_->_queue_name_len); + } else { + std::cerr << "ERROR: Could not open file \"" << fileName_ << "\" for reading." << std::endl; + } + return std::string(""); +} + +void +JournalFileController::writeFileHeader(const file_hdr_t* fhdr_, + const std::string& queueName_, + const std::string& fileName_) { + //std::cout << "*** JournalFileController::writeFileHeader() qn=" << queueName_ << " fn=" << fileName_ << std::endl; + std::fstream fs(fileName_.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary); + if (fs.good()) { + fs.seekp(0); + fs.write((const char*)fhdr_, sizeof(file_hdr_t)); + fs.write(queueName_.data(), fhdr_->_queue_name_len); + fs.close(); + } else { + std::cerr << "ERROR: Could not open file \"" << fileName_ << "\" for writing." << std::endl; + } +} + +void +JournalFileController::resetFileHeader(const std::string& fileName_) { + //std::cout << "*** JournalFileController::resetFileHeader() fn=" << fileName_ << std::endl; + file_hdr_t fhdr; + readFileHeader(&fhdr, fileName_); + ::file_hdr_reset(&fhdr); + writeFileHeader(&fhdr, std::string(""), fileName_); +} + +void +JournalFileController::initialzeFileHeader(const std::string& fileName_, + const uint64_t recId_, + const uint64_t firstRecOffs_, + const uint64_t fileSeqNum_, + const std::string& queueName_) { + //std::cout << "*** JournalFileController::initialzeFileHeader() fn=" << fileName_ << " sn=" << fileSeqNum_ << " qn=" << queueName_ << std::endl; + file_hdr_t fhdr; + readFileHeader(&fhdr, fileName_); + ::file_hdr_init(&fhdr, 0, recId_, firstRecOffs_, fileSeqNum_, queueName_.length(), queueName_.data()); + writeFileHeader(&fhdr, queueName_, fileName_); +} + +uint64_t +JournalFileController::getNextFileSeqNum() { + return __sync_add_and_fetch(&fileSeqCounter, 1); // GCC atomic increment, not portable +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h new file mode 100644 index 0000000000..cd23afd959 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_ +#define QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_ + +#include <deque> +#include "qpid/linearstore/jrnl/smutex.h" + +struct file_hdr_t; +namespace qpid { +namespace qls_jrnl { +class EmptyFilePool; +class JournalFile; +//typedef struct file_hdr_t file_hdr_t; + +class JournalFileController +{ +protected: + typedef std::deque<const JournalFile*> JournalFileList_t; + typedef JournalFileList_t::iterator JournalFileListItr_t; + + const std::string dir; + EmptyFilePool* efpp; + uint64_t fileSeqCounter; + JournalFileList_t journalFileList; + smutex journalFileListMutex; + +public: + JournalFileController(const std::string& dir, + EmptyFilePool* efpp); + virtual ~JournalFileController(); + + void pullEmptyFileFromEfp(const uint64_t recId, const uint64_t firstRecOffs, const std::string& queueName); + void purgeFilesToEfp(); + void finalize(); + void setFileSeqNum(const uint64_t fileSeqNum); + +protected: + std::string readFileHeader(file_hdr_t* fhdr, const std::string& fileName); + void writeFileHeader(const file_hdr_t* fhdr, const std::string& queueName, const std::string& fileName); + void resetFileHeader(const std::string& fileName); + void initialzeFileHeader(const std::string& fileName, const uint64_t recId, const uint64_t firstRecOffs, + const uint64_t fileSeqNum, const std::string& queueName); + uint64_t getNextFileSeqNum(); +}; + +}} // namespace qpid::qls_jrnl + +#endif // QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp new file mode 100644 index 0000000000..22a6412c21 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "JournalLog.h" +#include <iostream> + +namespace qpid { +namespace qls_jrnl { + +JournalLog::JournalLog() {} + +JournalLog::~JournalLog() {} + +void +JournalLog::log(log_level_t ll, const std::string& jid, const std::string& log_stmt) const { + log(ll, jid.c_str(), log_stmt.c_str()); +} + +void +JournalLog::log(log_level_t ll, const char* jid, const char* const log_stmt) const { + if (ll > LOG_ERROR) { + std::cerr << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl; + } else if (ll > LOG_INFO) { + std::cout << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl; + } + +} + +const char* +JournalLog::log_level_str(log_level_t ll) { + switch (ll) + { + case LOG_TRACE: return "TRACE"; + case LOG_DEBUG: return "DEBUG"; + case LOG_INFO: return "INFO"; + case LOG_NOTICE: return "NOTICE"; + case LOG_WARN: return "WARN"; + case LOG_ERROR: return "ERROR"; + case LOG_CRITICAL: return "CRITICAL"; + } + return "<log level unknown>"; +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h new file mode 100644 index 0000000000..83ba80d0b5 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LINEARSTORE_JOURNALLOG_H_ +#define QPID_LINEARSTORE_JOURNALLOG_H_ + +#include <string> + +namespace qpid { +namespace qls_jrnl { + +class JournalLog +{ +public: + typedef enum _log_level + { + LOG_TRACE = 0, + LOG_DEBUG, + LOG_INFO, + LOG_NOTICE, + LOG_WARN, + LOG_ERROR, + LOG_CRITICAL + } log_level_t; + +protected: + JournalLog(); + virtual ~JournalLog(); + +public: + virtual void log(log_level_t level, const std::string& jid, const std::string& log_stmt) const; + virtual void log(log_level_t level, const char* jid, const char* const log_stmt) const; + static const char* log_level_str(log_level_t ll); +}; + +}} // namespace qpid::qls_jrnl + +#endif // QPID_LINEARSTORE_JOURNALLOG_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enums.h b/qpid/cpp/src/qpid/linearstore/jrnl/enums.h index fffb8c07d7..31fa4e6ba3 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/enums.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/enums.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_ENUMS_H -#define QPID_LEGACYSTORE_JRNL_ENUMS_H +#ifndef QPID_LINEARSTORE_JRNL_ENUMS_H +#define QPID_LINEARSTORE_JRNL_ENUMS_H namespace qpid { @@ -64,6 +64,7 @@ namespace qls_jrnl return "<iores unknown>"; } +/* enum _log_level { LOG_TRACE = 0, @@ -74,9 +75,9 @@ namespace qls_jrnl LOG_ERROR, LOG_CRITICAL }; - typedef _log_level log_level; + typedef _log_level log_level_t; - static inline const char* log_level_str(log_level ll) + static inline const char* log_level_str(log_level_t ll) { switch (ll) { @@ -90,8 +91,9 @@ namespace qls_jrnl } return "<log level unknown>"; } +*/ }} -#endif // ifndef QPID_LEGACYSTORE_JRNL_ENUMS_H +#endif // ifndef QPID_LINEARSTORE_JRNL_ENUMS_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h index fc44e35331..110fee1334 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h @@ -40,7 +40,6 @@ #endif */ - /** * <b>Rule:</b> Data block size (JRNL_DBLK_SIZE) MUST be a power of 2 such that * <pre> @@ -51,6 +50,7 @@ #define JRNL_DBLK_SIZE 128 /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */ #define JRNL_SBLK_SIZE_DBLKS 32 /**< Disk softblock size in multiples of JRNL_DBLK_SIZE */ #define JRNL_SBLK_SIZE JRNL_SBLK_SIZE_DBLKS * JRNL_DBLK_SIZE /**< Disk softblock size in bytes */ +#define JRNL_SBLK_SIZE_KIB JRNL_SBLK_SIZE / 1024 /**< Disk softblock size in KiB */ //#define JRNL_MIN_FILE_SIZE 128 ///< Min. jrnl file size in sblks (excl. file_hdr) //#define JRNL_MAX_FILE_SIZE 4194176 ///< Max. jrnl file size in sblks (excl. file_hdr) //#define JRNL_MIN_NUM_FILES 4 ///< Min. number of journal files @@ -68,7 +68,7 @@ // //#define JRNL_INFO_EXTENSION "jinf" ///< Extension for journal info files //#define JRNL_DATA_EXTENSION "jdat" ///< Extension for journal data files -#define QLS_JRNL_FILE_EXTENSION "jdat" /**< Extension for journal data files */ +#define QLS_JRNL_FILE_EXTENSION ".jrnl" /**< Extension for journal data files */ //#define RHM_JDAT_TXA_MAGIC 0x614d4852 ///< ("RHMa" in little endian) Magic for dtx abort hdrs #define QLS_TXA_MAGIC 0x61534c51 /**< ("RHMa" in little endian) Magic for dtx abort hdrs */ //#define RHM_JDAT_TXC_MAGIC 0x634d4852 ///< ("RHMc" in little endian) Magic for dtx commit hdrs @@ -83,6 +83,7 @@ #define QLS_EMPTY_MAGIC 0x78534c51 /**< ("QLSx" in little endian) Magic for empty dblk */ //#define RHM_JDAT_VERSION 0x01 ///< Version (of file layout) #define QLS_JRNL_VERSION 0x0002 /**< Version (of file layout) */ +#define QLS_JRNL_FHDRSIZESBLKS 0x0001 /**< Journal file header size in sblks (as defined by JRNL_SBLK_SIZE) */ //#define RHM_CLEAN_CHAR 0xff ///< Char used to clear empty space on disk #define QLS_CLEAN_CHAR 0xff ///< Char used to clear empty space on disk // diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp index 552aa92b9c..126d3f7da3 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp @@ -29,9 +29,11 @@ #include <fstream> #include <iomanip> #include <iostream> +#include <qpid/linearstore/jrnl/EmptyFilePool.h> //#include "qpid/linearstore/jrnl/file_hdr.h" #include "qpid/linearstore/jrnl/jerrno.h" //#include "qpid/linearstore/jrnl/jinf.h" +#include "qpid/linearstore/jrnl/JournalFileController.h" #include <limits> #include <sstream> #include <unistd.h> @@ -62,15 +64,16 @@ bool jcntl::init_statics() // Functions -jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename): +jcntl::jcntl(const std::string& jid, const std::string& jdir/*, const std::string& base_filename*/): _jid(jid), - _jdir(jdir, base_filename), - _base_filename(base_filename), + _jdir(jdir/*, base_filename*/), +// _base_filename(base_filename), _init_flag(false), _stop_flag(false), _readonly_flag(false), - _autostop(true), - _jfsize_sblks(0), +// _autostop(true), + _jfcp(0), +// _jfsize_sblks(0), // _lpmgr(), _emap(), _tmap(), @@ -87,11 +90,16 @@ jcntl::~jcntl() try { stop(true); } catch (const jexception& e) { std::cerr << e << std::endl; } // _lpmgr.finalize(); + if (_jfcp) { + _jfcp->finalize(); + delete _jfcp; + _jfcp = 0; + } } void jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max_jfiles, - const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, + const uint32_t jfsize_sblks,*/ EmptyFilePool* efpp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, aio_callback* const cbp) { _init_flag = false; @@ -101,6 +109,12 @@ jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_ _emap.clear(); _tmap.clear(); + if (_jfcp) { + _jfcp->finalize(); + delete _jfcp; + _jfcp = 0; + } + // _lpmgr.finalize(); // Set new file geometry parameters @@ -117,6 +131,8 @@ jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_ _jdir.clear_dir(); // _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl); + _jfcp = new JournalFileController(_jdir.dirname(), efpp); + _jfcp->pullEmptyFileFromEfp(1, 4096, _jid); // _wrfc.initialize(_jfsize_sblks); // _rrfc.initialize(); // _rrfc.set_findex(0); @@ -159,7 +175,7 @@ jcntl::recover(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max highest_rid = _rcvdat._h_rid; if (_rcvdat._jfull) throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover"); - this->log(LOG_DEBUG, _rcvdat.to_log(_jid)); + this->log(LOG_DEBUG, _jid, _rcvdat.to_log(_jid)); // _lpmgr.recover(_rcvdat, this, &new_fcntl); @@ -192,6 +208,7 @@ void jcntl::delete_jrnl_files() { stop(true); // wait for AIO to complete + _jfcp->purgeFilesToEfp(); _jdir.delete_dir(); } @@ -407,20 +424,22 @@ jcntl::flush(const bool block_till_aio_cmpl) return res; } +/* void -jcntl::log(log_level ll, const std::string& log_stmt) const +jcntl::log(log_level_t ll, const std::string& log_stmt) const { log(ll, log_stmt.c_str()); } void -jcntl::log(log_level ll, const char* const log_stmt) const +jcntl::log(log_level_t ll, const char* const log_stmt) const { if (ll > LOG_INFO) { std::cout << log_level_str(ll) << ": Journal \"" << _jid << "\": " << log_stmt << std::endl; } } +*/ /* void @@ -523,7 +542,7 @@ jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp) { std::ostringstream oss; oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str(); - this->log(LOG_CRITICAL, oss.str()); + this->log(LOG_CRITICAL, _jid, oss.str()); throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait"); } } @@ -877,8 +896,8 @@ jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& r if (!ifsp->is_open()) { std::ostringstream oss; - oss << _jdir.dirname() << "/" << _base_filename << "."; - oss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << QLS_JRNL_FILE_EXTENSION; + oss << _jdir.dirname() << "/" /*<< _base_filename*/ << "."; // TODO - linear journal name + oss << std::hex << std::setfill('0') << std::setw(4) << fid << QLS_JRNL_FILE_EXTENSION; ifsp->clear(); // clear eof flag, req'd for older versions of c++ ifsp->open(oss.str().c_str(), std::ios_base::in | std::ios_base::binary); if (!ifsp->good()) @@ -946,12 +965,12 @@ jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcv oss << std::hex << "Bad record alignment found at fid=0x" << fid; oss << " offs=0x" << file_pos << " (likely journal overwrite boundary); " << std::dec; oss << (JRNL_SBLK_SIZE_DBLKS - (sblk_offs/JRNL_DBLK_SIZE)) << " filler record(s) required."; - this->log(LOG_WARN, oss.str()); + this->log(LOG_WARN, _jid, oss.str()); } const uint32_t xmagic = QLS_EMPTY_MAGIC; std::ostringstream oss; - oss << _jdir.dirname() << "/" << _base_filename << "."; - oss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << QLS_JRNL_FILE_EXTENSION; + oss << _jdir.dirname() << "/" /*<< _base_filename*/ << "."; // TODO linear journal name + oss << std::hex << std::setfill('0') << std::setw(4) << fid << QLS_JRNL_FILE_EXTENSION; std::ofstream ofsp(oss.str().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary); if (!ofsp.good()) @@ -971,7 +990,7 @@ jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcv assert(!ofsp.fail()); std::ostringstream oss; oss << std::hex << "Recover phase write: Wrote filler record: fid=0x" << fid << " offs=0x" << file_pos; - this->log(LOG_NOTICE, oss.str()); + this->log(LOG_NOTICE, _jid, oss.str()); file_pos = ofsp.tellp(); } ofsp.close(); @@ -979,7 +998,7 @@ jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcv rd._lfid = fid; // if (!rd._frot) // rd._ffid = (fid + 1) % rd._njf; - this->log(LOG_INFO, "Bad record alignment fixed."); + this->log(LOG_INFO, _jid, "Bad record alignment fixed."); } rd._eo = file_pos; } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h index debcc02efc..8a2c178f67 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h @@ -31,6 +31,7 @@ namespace qls_jrnl #include <cstddef> #include <deque> +#include <qpid/linearstore/jrnl/JournalLog.h> #include "qpid/linearstore/jrnl/jdir.h" //#include "qpid/linearstore/jrnl/fcntl.h" //#include "qpid/linearstore/jrnl/lpmgr.h" @@ -45,6 +46,8 @@ namespace qpid { namespace qls_jrnl { +class EmptyFilePool; +class JournalFileController; /** * \brief Access and control interface for the journal. This is the top-level class for the @@ -56,7 +59,7 @@ namespace qls_jrnl * which is used per data block written to the journal, and is used to track its status through * the AIO enqueue, read and dequeue process. */ - class jcntl + class jcntl : public JournalLog { protected: /** @@ -85,7 +88,7 @@ namespace qls_jrnl * that will be written to disk. No file separator characters should be included here, but * all other legal filename characters are valid. */ - std::string _base_filename; +// std::string _base_filename; /** * \brief Initialized flag @@ -121,10 +124,11 @@ namespace qls_jrnl * marker. If not set, then attempts to write will throw exceptions until the journal * file low water marker moves to the next journal file. */ - bool _autostop; ///< Autostop flag - stops journal when overrun occurs + //bool _autostop; ///< Autostop flag - stops journal when overrun occurs // Journal control structures - uint32_t _jfsize_sblks; ///< Journal file size in sblks + JournalFileController* _jfcp;///< Journal File Controller + //uint32_t _jfsize_sblks; ///< Journal file size in sblks //lpmgr _lpmgr; ///< LFID-PFID manager tracks inserted journal files enq_map _emap; ///< Enqueue map for low water mark management txn_map _tmap; ///< Transaction map open transactions @@ -148,7 +152,7 @@ namespace qls_jrnl * \param jdir The directory which will contain the journal files. * \param base_filename The string which will be used to start all journal filenames. */ - jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename); + jcntl(const std::string& jid, const std::string& jdir/*, const std::string& base_filename*/); /** * \brief Destructor. @@ -190,7 +194,7 @@ namespace qls_jrnl * \exception TODO */ void initialize(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles, - const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, + const uint32_t jfsize_sblks,*/EmptyFilePool* efpp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, aio_callback* const cbp); /** @@ -628,17 +632,17 @@ namespace qls_jrnl * the same directory, their base filenames <b>MUST</b> be different or else the instances * will overwrite one another. */ - inline const std::string& base_filename() const { return _base_filename; } +// inline const std::string& base_filename() const { return _base_filename; } // inline uint16_t num_jfiles() const; { return _lpmgr.num_jfiles(); } // inline fcntl* get_fcntlp(const uint16_t lfid) const { return _lpmgr.get_fcntlp(lfid); } - inline uint32_t jfsize_sblks() const { return _jfsize_sblks; } +// inline uint32_t jfsize_sblks() const { return _jfsize_sblks; } // Logging - virtual void log(log_level level, const std::string& log_stmt) const; - virtual void log(log_level level, const char* const log_stmt) const; +// virtual void log(log_level_t level, const std::string& log_stmt) const; +// virtual void log(log_level_t level, const char* const log_stmt) const; // FIXME these are _rmgr to _wmgr interactions, remove when _rmgr contains ref to _wmgr: //void chk_wr_frot(); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp index a191ed86d9..89bce926e0 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp @@ -37,9 +37,9 @@ namespace qpid namespace qls_jrnl { -jdir::jdir(const std::string& dirname, const std::string& _base_filename): - _dirname(dirname), - _base_filename(_base_filename) +jdir::jdir(const std::string& dirname/*, const std::string& _base_filename*/): + _dirname(dirname)/*, + _base_filename(_base_filename)*/ {} jdir::~jdir() @@ -88,21 +88,22 @@ jdir::create_dir(const std::string& dirname) void jdir::clear_dir(const bool create_flag) { - clear_dir(_dirname, _base_filename, create_flag); + clear_dir(_dirname/*, _base_filename*/, create_flag); } void -jdir::clear_dir(const char* dirname, const char* base_filename, const bool create_flag) +jdir::clear_dir(const char* dirname/*, const char* base_filename*/, const bool create_flag) { - clear_dir(std::string(dirname), std::string(base_filename), create_flag); + clear_dir(std::string(dirname)/*, std::string(base_filename)*/, create_flag); } void -jdir::clear_dir(const std::string& dirname, const std::string& +jdir::clear_dir(const std::string& dirname/*, const std::string& #ifndef RHM_JOWRITE base_filename #endif +*/ , const bool create_flag) { DIR* dir = ::opendir(dirname.c_str()); @@ -117,7 +118,7 @@ jdir::clear_dir(const std::string& dirname, const std::string& oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "clear_dir"); } -#ifndef RHM_JOWRITE +//#ifndef RHM_JOWRITE struct dirent* entry; bool found = false; std::string bak_dir; @@ -126,13 +127,13 @@ jdir::clear_dir(const std::string& dirname, const std::string& // Ignore . and .. if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0) { - if (std::strlen(entry->d_name) > base_filename.size()) + if (std::strlen(entry->d_name) >= 3) // 'bak' { - if (std::strncmp(entry->d_name, base_filename.c_str(), base_filename.size()) == 0) + if (std::strncmp(entry->d_name, "bak", 3) == 0) { if (!found) { - bak_dir = create_bak_dir(dirname, base_filename); + bak_dir = create_bak_dir(dirname/*, base_filename*/); found = true; } std::ostringstream oldname; @@ -154,16 +155,16 @@ jdir::clear_dir(const std::string& dirname, const std::string& // FIXME: Find out why this fails with false alarms/errors from time to time... // While commented out, there is no error capture from reading dir entries. // check_err(errno, dir, dirname, "clear_dir"); -#endif +//#endif close_dir(dir, dirname, "clear_dir"); } // === push_down === std::string -jdir::push_down(const std::string& dirname, const std::string& target_dir, const std::string& bak_dir_base) +jdir::push_down(const std::string& dirname, const std::string& target_dir/*, const std::string& bak_dir_base*/) { - std::string bak_dir_name = create_bak_dir(dirname, bak_dir_base); + std::string bak_dir_name = create_bak_dir(dirname/*, bak_dir_base*/); DIR* dir = ::opendir(dirname.c_str()); if (!dir) @@ -202,18 +203,18 @@ jdir::push_down(const std::string& dirname, const std::string& target_dir, const void jdir::verify_dir() { - verify_dir(_dirname, _base_filename); + verify_dir(_dirname/*, _base_filename*/); } void -jdir::verify_dir(const char* dirname, const char* base_filename) +jdir::verify_dir(const char* dirname/*, const char* base_filename*/) { - verify_dir(std::string(dirname), std::string(base_filename)); + verify_dir(std::string(dirname)/*, std::string(base_filename)*/); } void -jdir::verify_dir(const std::string& dirname, const std::string& /*base_filename*/) +jdir::verify_dir(const std::string& dirname/*, const std::string& base_filename*/) { if (!is_dir(dirname)) { @@ -323,7 +324,7 @@ jdir::delete_dir(const std::string& dirname, bool children_only) std::string -jdir::create_bak_dir(const std::string& dirname, const std::string& base_filename) +jdir::create_bak_dir(const std::string& dirname) { DIR* dir = ::opendir(dirname.c_str()); long dir_num = 0L; @@ -339,13 +340,11 @@ jdir::create_bak_dir(const std::string& dirname, const std::string& base_filenam // Ignore . and .. if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0) { - if (std::strlen(entry->d_name) == base_filename.size() + 10) // Format: basename.bak.XXXX + if (std::strlen(entry->d_name) == 9) // Format: _bak.XXXX { - std::ostringstream oss; - oss << "_" << base_filename << ".bak."; - if (std::strncmp(entry->d_name, oss.str().c_str(), base_filename.size() + 6) == 0) + if (std::strncmp(entry->d_name, "_bak.", 5) == 0) { - long this_dir_num = std::strtol(entry->d_name + base_filename.size() + 6, 0, 16); + long this_dir_num = std::strtol(entry->d_name + 5, 0, 16); if (this_dir_num > dir_num) dir_num = this_dir_num; } @@ -358,8 +357,7 @@ jdir::create_bak_dir(const std::string& dirname, const std::string& base_filenam close_dir(dir, dirname, "create_bak_dir"); std::ostringstream dn; - dn << dirname << "/_" << base_filename << ".bak." << std::hex << std::setw(4) << - std::setfill('0') << ++dir_num; + dn << dirname << "/_bak." << std::hex << std::setw(4) << std::setfill('0') << ++dir_num; if (::mkdir(dn.str().c_str(), S_IRWXU | S_IRWXG | S_IROTH)) { std::ostringstream oss; @@ -411,6 +409,32 @@ jdir::exists(const std::string& name) } void +jdir::read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links) { + struct stat s; + if (is_dir(name)) { + DIR* dir = ::opendir(name.c_str()); + if (dir != 0) { + struct dirent* entry; + while ((entry = ::readdir(dir)) != 0) { + if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0) { // Ignore . and .. + std::string full_name(name + "/" + entry->d_name); + if (::stat(full_name.c_str(), &s)) + { + ::closedir(dir); + std::ostringstream oss; + oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir"); + } + if ((S_ISREG(s.st_mode) && incl_files) || (S_ISDIR(s.st_mode) && incl_dirs) || (S_ISLNK(s.st_mode) && incl_links)) + dir_list.push_back(entry->d_name); + } + } + } + close_dir(dir, name, "read_dir"); + } +} + +void jdir::check_err(const int err_num, DIR* dir, const std::string& dir_name, const std::string& fn_name) { if (err_num) diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h index af4797f44c..53519b1f2c 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_JDIR_H -#define QPID_LEGACYSTORE_JRNL_JDIR_H +#ifndef QPID_LINEARSTORE_JRNL_JDIR_H +#define QPID_LINEARSTORE_JRNL_JDIR_H namespace qpid { @@ -32,6 +32,7 @@ class jdir; //#include "qpid/linearstore/jrnl/jinf.h" #include <dirent.h> #include <string> +#include <vector> namespace qpid { @@ -46,7 +47,7 @@ namespace qls_jrnl { private: std::string _dirname; - std::string _base_filename; + //std::string _base_filename; public: @@ -57,7 +58,7 @@ namespace qls_jrnl * \param base_filename Filename root used in the creation of %journal files * and sub-directories. */ - jdir(const std::string& dirname, const std::string& base_filename); + jdir(const std::string& dirname/*, const std::string& base_filename*/); virtual ~jdir(); @@ -119,7 +120,7 @@ namespace qls_jrnl * directory failed. * \exception jerrno::JERR_JDIR_CLOSEDIR The directory handle could not be closed. */ - static void clear_dir(const char* dirname, const char* base_filename, + static void clear_dir(const char* dirname/*, const char* base_filename*/, const bool create_flag = true); /** @@ -137,7 +138,7 @@ namespace qls_jrnl * directory failed. * \exception jerrno::JERR_JDIR_CLOSEDIR The directory handle could not be closed. */ - static void clear_dir(const std::string& dirname, const std::string& base_filename, + static void clear_dir(const std::string& dirname/*, const std::string& base_filename*/, const bool create_flag = true); @@ -152,7 +153,7 @@ namespace qls_jrnl * \param bak_dir_base Base name for backup directory to be created in dirname, into which target_dir will be moved. * \return Name of backup dir into which target_dir was pushed. */ - static std::string push_down(const std::string& dirname, const std::string& target_dir, const std::string& bak_dir_base); + static std::string push_down(const std::string& dirname, const std::string& target_dir/*, const std::string& bak_dir_base*/); /** @@ -184,7 +185,7 @@ namespace qls_jrnl * \exception jerrno::JERR_JINF_CVALIDFAIL Error validating %jinf file * \exception jerrno::JERR_JDIR_NOSUCHFILE Expected jdat file is missing */ - static void verify_dir(const char* dirname, const char* base_filename); + static void verify_dir(const char* dirname/*, const char* base_filename*/); /** * \brief Verify that dirname is a valid %journal directory. @@ -201,7 +202,7 @@ namespace qls_jrnl * \exception jerrno::JERR_JINF_CVALIDFAIL Error validating %jinf file * \exception jerrno::JERR_JDIR_NOSUCHFILE Expected jdat file is missing */ - static void verify_dir(const std::string& dirname, const std::string& base_filename); + static void verify_dir(const std::string& dirname/*, const std::string& base_filename*/); /** * \brief Delete the %journal directory and all files and sub--directories that it may @@ -273,8 +274,8 @@ namespace qls_jrnl * \exception jerrno::JERR_JDIR_CLOSEDIR The directory handle could not be closed. * \exception jerrno::JERR_JDIR_MKDIR The backup directory could not be deleted. */ - static std::string create_bak_dir(const std::string& dirname, - const std::string& base_filename); + static std::string create_bak_dir(const std::string& dirname/*, + const std::string& base_filename*/); /** * \brief Return the directory name as a string. @@ -284,7 +285,7 @@ namespace qls_jrnl /** * \brief Return the %journal base filename name as a string. */ - inline const std::string& base_filename() const { return _base_filename; } +// inline const std::string& base_filename() const { return _base_filename; } /** * \brief Test whether the named file is a directory. @@ -335,6 +336,8 @@ namespace qls_jrnl */ static bool exists(const std::string& name); + static void read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links); + /** * \brief Stream operator */ @@ -363,4 +366,4 @@ namespace qls_jrnl }} -#endif // ifndef QPID_LEGACYSTORE_JRNL_JDIR_H +#endif // ifndef QPID_LINEARSTORE_JRNL_JDIR_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp index 718315a638..ecd5469d8b 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp @@ -31,39 +31,39 @@ std::map<uint32_t, const char*>::iterator jerrno::_err_map_itr; bool jerrno::_initialized = jerrno::__init(); // generic errors -const uint32_t jerrno::JERR__MALLOC = 0x0100; -const uint32_t jerrno::JERR__UNDERFLOW = 0x0101; -const uint32_t jerrno::JERR__NINIT = 0x0102; -const uint32_t jerrno::JERR__AIO = 0x0103; -const uint32_t jerrno::JERR__FILEIO = 0x0104; -const uint32_t jerrno::JERR__RTCLOCK = 0x0105; -const uint32_t jerrno::JERR__PTHREAD = 0x0106; -const uint32_t jerrno::JERR__TIMEOUT = 0x0107; -const uint32_t jerrno::JERR__UNEXPRESPONSE = 0x0108; -const uint32_t jerrno::JERR__RECNFOUND = 0x0109; -const uint32_t jerrno::JERR__NOTIMPL = 0x010a; +const uint32_t jerrno::JERR__MALLOC = 0x0100; +const uint32_t jerrno::JERR__UNDERFLOW = 0x0101; +const uint32_t jerrno::JERR__NINIT = 0x0102; +const uint32_t jerrno::JERR__AIO = 0x0103; +const uint32_t jerrno::JERR__FILEIO = 0x0104; +const uint32_t jerrno::JERR__RTCLOCK = 0x0105; +const uint32_t jerrno::JERR__PTHREAD = 0x0106; +const uint32_t jerrno::JERR__TIMEOUT = 0x0107; +const uint32_t jerrno::JERR__UNEXPRESPONSE = 0x0108; +const uint32_t jerrno::JERR__RECNFOUND = 0x0109; +const uint32_t jerrno::JERR__NOTIMPL = 0x010a; // class jcntl -const uint32_t jerrno::JERR_JCNTL_STOPPED = 0x0200; -const uint32_t jerrno::JERR_JCNTL_READONLY = 0x0201; -const uint32_t jerrno::JERR_JCNTL_AIOCMPLWAIT = 0x0202; -const uint32_t jerrno::JERR_JCNTL_UNKNOWNMAGIC = 0x0203; -const uint32_t jerrno::JERR_JCNTL_NOTRECOVERED = 0x0204; -const uint32_t jerrno::JERR_JCNTL_RECOVERJFULL = 0x0205; -const uint32_t jerrno::JERR_JCNTL_OWIMISMATCH = 0x0206; +const uint32_t jerrno::JERR_JCNTL_STOPPED = 0x0200; +const uint32_t jerrno::JERR_JCNTL_READONLY = 0x0201; +const uint32_t jerrno::JERR_JCNTL_AIOCMPLWAIT = 0x0202; +const uint32_t jerrno::JERR_JCNTL_UNKNOWNMAGIC = 0x0203; +const uint32_t jerrno::JERR_JCNTL_NOTRECOVERED = 0x0204; +const uint32_t jerrno::JERR_JCNTL_RECOVERJFULL = 0x0205; +const uint32_t jerrno::JERR_JCNTL_OWIMISMATCH = 0x0206; // class jdir -const uint32_t jerrno::JERR_JDIR_NOTDIR = 0x0300; -const uint32_t jerrno::JERR_JDIR_MKDIR = 0x0301; -const uint32_t jerrno::JERR_JDIR_OPENDIR = 0x0302; -const uint32_t jerrno::JERR_JDIR_READDIR = 0x0303; -const uint32_t jerrno::JERR_JDIR_CLOSEDIR = 0x0304; -const uint32_t jerrno::JERR_JDIR_RMDIR = 0x0305; -const uint32_t jerrno::JERR_JDIR_NOSUCHFILE = 0x0306; -const uint32_t jerrno::JERR_JDIR_FMOVE = 0x0307; -const uint32_t jerrno::JERR_JDIR_STAT = 0x0308; -const uint32_t jerrno::JERR_JDIR_UNLINK = 0x0309; -const uint32_t jerrno::JERR_JDIR_BADFTYPE = 0x030a; +const uint32_t jerrno::JERR_JDIR_NOTDIR = 0x0300; +const uint32_t jerrno::JERR_JDIR_MKDIR = 0x0301; +const uint32_t jerrno::JERR_JDIR_OPENDIR = 0x0302; +const uint32_t jerrno::JERR_JDIR_READDIR = 0x0303; +const uint32_t jerrno::JERR_JDIR_CLOSEDIR = 0x0304; +const uint32_t jerrno::JERR_JDIR_RMDIR = 0x0305; +const uint32_t jerrno::JERR_JDIR_NOSUCHFILE = 0x0306; +const uint32_t jerrno::JERR_JDIR_FMOVE = 0x0307; +const uint32_t jerrno::JERR_JDIR_STAT = 0x0308; +const uint32_t jerrno::JERR_JDIR_UNLINK = 0x0309; +const uint32_t jerrno::JERR_JDIR_BADFTYPE = 0x030a; // class fcntl //const uint32_t jerrno::JERR_FCNTL_OPENWR = 0x0400; @@ -82,31 +82,31 @@ const uint32_t jerrno::JERR_JDIR_BADFTYPE = 0x030a; //const uint32_t jerrno::JERR_RRFC_OPENRD = 0x0600; // class jrec, enq_rec, deq_rec, txn_rec -const uint32_t jerrno::JERR_JREC_BADRECHDR = 0x0700; -const uint32_t jerrno::JERR_JREC_BADRECTAIL = 0x0701; +const uint32_t jerrno::JERR_JREC_BADRECHDR = 0x0700; +const uint32_t jerrno::JERR_JREC_BADRECTAIL = 0x0701; // class wmgr -const uint32_t jerrno::JERR_WMGR_BADPGSTATE = 0x0801; -const uint32_t jerrno::JERR_WMGR_BADDTOKSTATE = 0x0802; -const uint32_t jerrno::JERR_WMGR_ENQDISCONT = 0x0803; -const uint32_t jerrno::JERR_WMGR_DEQDISCONT = 0x0804; -const uint32_t jerrno::JERR_WMGR_DEQRIDNOTENQ = 0x0805; +const uint32_t jerrno::JERR_WMGR_BADPGSTATE = 0x0801; +const uint32_t jerrno::JERR_WMGR_BADDTOKSTATE = 0x0802; +const uint32_t jerrno::JERR_WMGR_ENQDISCONT = 0x0803; +const uint32_t jerrno::JERR_WMGR_DEQDISCONT = 0x0804; +const uint32_t jerrno::JERR_WMGR_DEQRIDNOTENQ = 0x0805; // class rmgr -const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900; -const uint32_t jerrno::JERR_RMGR_RIDMISMATCH = 0x0901; +const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900; +const uint32_t jerrno::JERR_RMGR_RIDMISMATCH = 0x0901; //const uint32_t jerrno::JERR_RMGR_FIDMISMATCH = 0x0902; -const uint32_t jerrno::JERR_RMGR_ENQSTATE = 0x0903; -const uint32_t jerrno::JERR_RMGR_BADRECTYPE = 0x0904; +const uint32_t jerrno::JERR_RMGR_ENQSTATE = 0x0903; +const uint32_t jerrno::JERR_RMGR_BADRECTYPE = 0x0904; // class data_tok -const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00; +const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00; // const uint32_t jerrno::JERR_DTOK_RIDNOTSET = 0x0a01; // class enq_map, txn_map -const uint32_t jerrno::JERR_MAP_DUPLICATE = 0x0b00; -const uint32_t jerrno::JERR_MAP_NOTFOUND = 0x0b01; -const uint32_t jerrno::JERR_MAP_LOCKED = 0x0b02; +const uint32_t jerrno::JERR_MAP_DUPLICATE = 0x0b00; +const uint32_t jerrno::JERR_MAP_NOTFOUND = 0x0b01; +const uint32_t jerrno::JERR_MAP_LOCKED = 0x0b02; // class jinf //const uint32_t jerrno::JERR_JINF_CVALIDFAIL = 0x0c00; @@ -121,6 +121,13 @@ const uint32_t jerrno::JERR_MAP_LOCKED = 0x0b02; //const uint32_t jerrno::JERR_JINF_OWIBAD = 0x0c09; //const uint32_t jerrno::JERR_JINF_ZEROLENFILE = 0x0c0a; +// EFP errors +const uint32_t jerrno::JERR_EFP_BADPARTITIONNAME = 0x0d01; +const uint32_t jerrno::JERR_EFP_BADPARTITIONDIR = 0x0d02; +const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME = 0x0d03; +const uint32_t jerrno::JERR_EFP_NOEFP = 0x0d04; +const uint32_t jerrno::JERR_EFP_EMPTY = 0x0d05; + // Negative returns for some functions const int32_t jerrno::AIO_TIMEOUT = -1; const int32_t jerrno::LOCK_TAKEN = -2; @@ -222,6 +229,13 @@ jerrno::__init() // _err_map[JERR_JINF_OWIBAD] = "JERR_JINF_OWIBAD: Journal data files have inconsistent OWI flags; >1 transition found in non-auto-expand or min-size journal"; // _err_map[JERR_JINF_ZEROLENFILE] = "JERR_JINF_ZEROLENFILE: Journal info file zero length"; + // EFP errors + _err_map[JERR_EFP_BADPARTITIONNAME] = "JERR_EFP_BADPARTITIONNAME: Invalid partition name (must be \'pNNN\' where NNN is a non-zero number)"; + _err_map[JERR_EFP_BADEFPDIRNAME] = "JERR_EFP_BADEFPDIRNAME: Bad Empty File Pool directory name (must be \'NNNk\', where NNN is a number which is a multiple of 4)"; + _err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid partition directory"; + _err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for given partition and empty file size"; + _err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty"; + //_err_map[] = ""; return true; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h index 19b9954c93..2ebabd84b8 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h @@ -140,6 +140,13 @@ namespace qls_jrnl // static const uint32_t JERR_JINF_OWIBAD; ///< OWI inconsistent (>1 transition in non-ae journal) // static const uint32_t JERR_JINF_ZEROLENFILE; ///< Journal info file is zero length (empty). + // EFP errors + static const uint32_t JERR_EFP_BADPARTITIONNAME; ///< Partition name invalid or of value 0 + static const uint32_t JERR_EFP_BADEFPDIRNAME; ///< Empty File Pool directory name invalid + static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory + static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size + static const uint32_t JERR_EFP_EMPTY; ///< EFP empty + // Negative returns for some functions static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return static const int32_t LOCK_TAKEN; ///< Attempted to take lock, but it was taken by another thread diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp index b6d0f0f2ad..9722b78e81 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp @@ -61,7 +61,7 @@ rmgr::initialize(aio_callback* const cbp) throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize"); } _fhdr_aio_cb_ptr = new aio_cb; - std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb*)); + std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb)); } void @@ -638,6 +638,7 @@ rmgr::init_aio_reads(const int16_t /*first_uninit*/, const uint16_t /*num_uninit void rmgr::rotate_page() { +/* _page_cb_arr[_pg_index]._rdblks = 0; _page_cb_arr[_pg_index]._state = UNUSED; if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS) @@ -654,6 +655,7 @@ rmgr::rotate_page() // Need to move reset into if (_rrfc.file_rotate()) above. if (_pg_cntr >= (_jc->jfsize_sblks() / JRNL_RMGR_PAGE_SIZE)) _pg_cntr = 0; +*/ } uint32_t diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c index e8263ea281..e8ebe9db94 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c @@ -20,28 +20,69 @@ */ #include "file_hdr.h" +#include <string.h> -int file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, - const uint64_t rid, const uint64_t fro, const uint32_t file_count, const uint64_t file_size, - const uint64_t file_number) { - rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid); +void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks, + const uint16_t efp_partition, const uint64_t file_size) { + rec_hdr_init(&dest->_rhdr, magic, version, 0, 0); + dest->_fhdr_size_sblks = fhdr_size_sblks; + dest->_efp_partition = efp_partition; + dest->_reserved = 0; + dest->_file_size_kib = file_size; + dest->_fro = 0; + dest->_ts_nsec = 0; + dest->_ts_sec = 0; + dest->_file_number = 0; + dest->_queue_name_len = 0; +} + +int file_hdr_init(file_hdr_t* dest, const uint16_t uflag, const uint64_t rid, const uint64_t fro, + const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) { + dest->_rhdr._uflag = uflag; + dest->_rhdr._rid = rid; dest->_fro = fro; - dest->_file_count = file_count; - dest->_file_size = file_size; dest->_file_number = file_number; + if (sizeof(file_hdr_t) + queue_name_len < MAX_FILE_HDR_LEN) { + dest->_queue_name_len = queue_name_len; + } else { + dest->_queue_name_len = MAX_FILE_HDR_LEN - sizeof(file_hdr_t); + } + dest->_queue_name_len = queue_name_len; + memcpy(dest + sizeof(file_hdr_t), queue_name, queue_name_len); return set_time_now(dest); } void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src) { rec_hdr_copy(&dest->_rhdr, &src->_rhdr); + dest->_fhdr_size_sblks = src->_fhdr_size_sblks; // Should this be copied? + dest->_efp_partition = src->_efp_partition; // Should this be copied? + dest->_file_size_kib = src->_file_size_kib; dest->_fro = src->_fro; dest->_ts_sec = src->_ts_sec; dest->_ts_nsec = src->_ts_nsec; - dest->_file_count = src->_file_count; - dest->_file_size = src->_file_size; dest->_file_number = src->_file_number; } +void file_hdr_reset(file_hdr_t* target) { + target->_rhdr._uflag = 0; + target->_rhdr._rid = 0; + target->_fro = 0; + target->_ts_sec = 0; + target->_ts_nsec = 0; + target->_file_number = 0; + target->_queue_name_len = 0; + memset(target + sizeof(file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(file_hdr_t)); +} + +int is_file_hdr_reset(file_hdr_t* target) { + return target->_rhdr._uflag == 0 && + target->_rhdr._rid == 0 && + target->_ts_sec == 0 && + target->_ts_nsec == 0 && + target->_file_number == 0 && + target->_queue_name_len == 0; +} + int set_time_now(file_hdr_t *fh) { struct timespec ts; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h index 304abf0700..6a2cae4ba4 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h @@ -28,6 +28,8 @@ extern "C"{ #endif +#define MAX_FILE_HDR_LEN 4096 // Set to 1 sblk + #pragma pack(1) /** @@ -47,44 +49,50 @@ extern "C"{ * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t * | first rid in file | | * +---+---+---+---+---+---+---+---+ -+ + * | fs | partn | reserved | + * +---+---+---+---+---+---+---+---+ + * | file-size | + * +---+---+---+---+---+---+---+---+ * | fro | * +---+---+---+---+---+---+---+---+ * | timestamp (sec) | * +---+---+---+---+---+---+---+---+ * | timestamp (ns) | * +---+---+---+---+---+---+---+---+ - * | file-count | reserved | - * +---+---+---+---+---+---+---+---+ - * | file-size | - * +---+---+---+---+---+---+---+---+ * | file-number | * +---+---+---+---+---+---+---+---+ - * | n-len | Queue Name... | + * | qnl | Queue Name... | * +-------+ | * | | * +---+---+---+---+---+---+---+---+ * - * ver = file version (If the format or encoding of this file changes, then this - * number should be incremented) + * ver = Journal version + * rid = Record ID + * fs = File header size in sblks (defined by JRNL_SBLK_SIZE) + * partn = EFP partition from which this file came * fro = First Record Offset - * n-len = Length of the queue name in octets. + * qnl = Length of the queue name in octets. * </pre> */ typedef struct file_hdr_t { - rec_hdr_t _rhdr; /**< Common record header struct, but rid field is used for rid of first compete record in file */ - uint64_t _fro; /**< First Record Offset (FRO) */ - uint64_t _ts_sec; /**< Time stamp (seconds part) */ - uint64_t _ts_nsec; /**< Time stamp (nanoseconds part) */ - uint32_t _file_count; /**< Total number of files in linear sequence at time of writing this file */ + rec_hdr_t _rhdr; /**< Common record header struct, but rid field is used for rid of first compete record in file */ + uint16_t _fhdr_size_sblks; /**< File header size in sblks (defined by JRNL_SBLK_SIZE) */ + uint16_t _efp_partition; /**< EFP Partition number from which this file was obtained */ uint32_t _reserved; - uint64_t _file_size; /**< Size of this file in octets, including header sblk */ - uint64_t _file_number; /**< The logical number of this file in a monotonically increasing sequence */ - uint16_t _name_length; /**< Length of the queue name in octets, which follows this struct in the header */ + uint64_t _file_size_kib; /**< Size of this file in KiB, excluding header sblk */ + uint64_t _fro; /**< First Record Offset (FRO) */ + uint64_t _ts_sec; /**< Time stamp (seconds part) */ + uint64_t _ts_nsec; /**< Time stamp (nanoseconds part) */ + uint64_t _file_number; /**< The logical number of this file in a monotonically increasing sequence */ + uint16_t _queue_name_len; /**< Length of the queue name in octets, which follows this struct in the header */ } file_hdr_t; -int file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, - const uint64_t rid, const uint64_t fro, const uint32_t file_count, const uint64_t file_size, - const uint64_t file_number); +void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks, + const uint16_t efp_partition, const uint64_t file_size); +int file_hdr_init(file_hdr_t* dest, const uint16_t uflag, const uint64_t rid, const uint64_t fro, + const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name); +void file_hdr_reset(file_hdr_t* target); +int is_file_hdr_reset(file_hdr_t* target); void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src); int set_time_now(file_hdr_t *fh); void set_time(file_hdr_t *fh, struct timespec *ts); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp index ccc9bf6bab..3062f059cd 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp @@ -45,9 +45,9 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/): _fhdr_ptr_arr(0), _fhdr_aio_cb_arr(0), _cached_offset_dblks(0), - _jfsize_dblks(0), - _jfsize_pgs(0), - _num_jfiles(0), +// _jfsize_dblks(0), +// _jfsize_pgs(0), +// _num_jfiles(0), _enq_busy(false), _deq_busy(false), _abort_busy(false), @@ -65,9 +65,9 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/, _fhdr_ptr_arr(0), _fhdr_aio_cb_arr(0), _cached_offset_dblks(0), - _jfsize_dblks(0), - _jfsize_pgs(0), - _num_jfiles(0), +// _jfsize_dblks(0), +// _jfsize_pgs(0), +// _num_jfiles(0), _enq_busy(false), _deq_busy(false), _abort_busy(false), @@ -94,9 +94,9 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); - _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE_DBLKS; - _jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks; - assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0); +// _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE_DBLKS; +// _jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks; +// assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0); if (eo) { @@ -555,7 +555,7 @@ wmgr::file_header_check(const uint64_t /*rid*/, const bool /*cont*/, const uint3 } void -wmgr::flush_check(iores& res, bool& cont, bool& done) +wmgr::flush_check(iores& res, bool& /*cont*/, bool& done) { // Is page is full, flush if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) @@ -569,6 +569,7 @@ wmgr::flush_check(iores& res, bool& cont, bool& done) done = true; } +/* // If file is full, rotate to next file if (_pg_cntr >= _jfsize_pgs) { @@ -583,6 +584,7 @@ wmgr::flush_check(iores& res, bool& cont, bool& done) done = true; } } +*/ } } @@ -590,12 +592,14 @@ iores wmgr::flush() { iores res = write_flush(); +/* if (_pg_cntr >= _jfsize_pgs) { iores rfres = rotate_file(); if (rfres != RHM_IORES_SUCCESS) res = rfres; } +*/ return res; } @@ -836,12 +840,13 @@ wmgr::is_txn_synced(const std::string& xid) } void -wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, const uint16_t wcache_num_pages) +wmgr::initialize(aio_callback* const /*cbp*/, const uint32_t /*wcache_pgsize_sblks*/, const uint16_t /*wcache_num_pages*/) { +/* pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); wmgr::clean(); - /*_num_jfiles = _jc->num_jfiles();*/ // TODO: replace for linearstore: _jc->num_jfiles() - if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize /** _num_jfiles*/)) + _num_jfiles = _jc->num_jfiles(); // TODO: replace for linearstore: _jc->num_jfiles() + if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * _num_jfiles)) { wmgr::clean(); std::ostringstream oss; @@ -863,6 +868,7 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, co _ddtokl.clear(); _cached_offset_dblks = 0; _enq_busy = false; +*/ } iores @@ -988,7 +994,7 @@ void wmgr::write_fhdr(uint64_t rid, uint16_t fid, uint16_t /*lid*/, std::size_t fro) { file_hdr_t fhdr/*(QLS_FILE_MAGIC, QLS_JRNL_VERSION, rid, fid, lid, fro, _wrfc.owi(), true)*/; - /*int err =*/ ::file_hdr_init(&fhdr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, rid, fro, 0, 0, 0); + /*int err =*/ ::file_hdr_init(&fhdr, 0, rid, fro, 0, _jc->id().length(), _jc->id().c_str()); std::memcpy(_fhdr_ptr_arr[fid], &fhdr, sizeof(fhdr)); #ifdef RHM_CLEAN std::memset((char*)_fhdr_ptr_arr[fid] + sizeof(fhdr), RHM_CLEAN_CHAR, _sblksize - sizeof(fhdr)); @@ -1030,8 +1036,8 @@ wmgr::clean() if (_fhdr_aio_cb_arr) { - for (uint32_t i=0; i<_num_jfiles; i++) - delete _fhdr_aio_cb_arr[i]; +// for (uint32_t i=0; i<_num_jfiles; i++) +// delete _fhdr_aio_cb_arr[i]; std::free(_fhdr_aio_cb_arr); _fhdr_aio_cb_arr = 0; } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h index 68de2db58f..a4afd8974c 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h @@ -67,9 +67,9 @@ namespace qls_jrnl aio_cb** _fhdr_aio_cb_arr; ///< Array of iocb pointers for file header writes uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks) std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list - uint32_t _jfsize_dblks; ///< Journal file size in dblks (NOT sblks!) - uint32_t _jfsize_pgs; ///< Journal file size in cache pages - uint16_t _num_jfiles; ///< Number of files used in iocb mallocs +// uint32_t _jfsize_dblks; ///< Journal file size in dblks (NOT sblks!) +// uint32_t _jfsize_pgs; ///< Journal file size in cache pages +// uint16_t _num_jfiles; ///< Number of files used in iocb mallocs // TODO: Convert _enq_busy etc into a proper threadsafe lock // TODO: Convert to enum? Are these encodes mutually exclusive? diff --git a/qpid/cpp/src/qpid/linearstore/management-schema.xml b/qpid/cpp/src/qpid/linearstore/management-schema.xml index a9f7b143ad..dce8f7988a 100644 --- a/qpid/cpp/src/qpid/linearstore/management-schema.xml +++ b/qpid/cpp/src/qpid/linearstore/management-schema.xml @@ -22,15 +22,15 @@ <class name="Store"> <property name="brokerRef" type="objId" access="RO" references="qpid.Broker" index="y" parentRef="y"/> <property name="location" type="sstr" access="RO" desc="Logical directory on disk"/> - <property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/> - <property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/> + <!--property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/--> + <!--property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/--> <property name="tplIsInitialized" type="bool" access="RO" desc="Transaction prepared list has been initialized by a transactional prepare"/> <property name="tplDirectory" type="sstr" access="RO" desc="Transaction prepared list directory"/> <property name="tplWritePageSize" type="uint32" access="RO" unit="byte" desc="Page size in transaction prepared list write-page-cache"/> <property name="tplWritePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in transaction prepared list write-page-cache"/> - <property name="tplInitialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to transaction prepared list journal"/> - <property name="tplDataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file in transaction prepared list journal"/> - <property name="tplCurrentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to transaction prepared list journal"/> + <!--property name="tplInitialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to transaction prepared list journal"/--> + <!--property name="tplDataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file in transaction prepared list journal"/--> + <!--property name="tplCurrentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to transaction prepared list journal"/--> <statistic name="tplTransactionDepth" type="hilo32" unit="txn" desc="Number of currently enqueued prepared transactions"/> <statistic name="tplTxnPrepares" type="count64" unit="record" desc="Total transaction prepares on transaction prepared list"/> @@ -48,11 +48,11 @@ <property name="writePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in write-page-cache"/> <property name="readPageSize" type="uint32" access="RO" unit="byte" desc="Page size in read-page-cache"/> <property name="readPages" type="uint32" access="RO" unit="rpage" desc="Number of pages in read-page-cache"/> - <property name="initialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to this journal"/> - <property name="autoExpand" type="bool" access="RO" desc="Auto-expand enabled"/> - <property name="currentFileCount" type="uint16" access="RO" unit="file" desc="Number of files currently allocated to this journal"/> - <property name="maxFileCount" type="uint16" access="RO" unit="file" desc="Max number of files allowed for this journal"/> - <property name="dataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file"/> + <!--property name="initialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to this journal"/--> + <!--property name="autoExpand" type="bool" access="RO" desc="Auto-expand enabled"/--> + <!--property name="currentFileCount" type="uint16" access="RO" unit="file" desc="Number of files currently allocated to this journal"/--> + <!--property name="maxFileCount" type="uint16" access="RO" unit="file" desc="Max number of files allowed for this journal"/--> + <!--property name="dataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file"/--> <statistic name="recordDepth" type="hilo32" unit="record" desc="Number of currently enqueued records (durable messages)"/> <statistic name="enqueues" type="count64" unit="record" desc="Total enqueued records on journal"/> @@ -76,13 +76,13 @@ <statistic name="writePageCacheDepth" type="hilo32" unit="wpage" desc="Current depth of write-page-cache"/> <statistic name="readPageCacheDepth" type="hilo32" unit="rpage" desc="Current depth of read-page-cache"/> - <method name="expand" desc="Increase number of files allocated for this journal"> + <!--method name="expand" desc="Increase number of files allocated for this journal"> <arg name="by" type="uint32" dir="I" desc="Number of files to increase journal size by"/> - </method> + </method--> </class> <eventArguments> - <arg name="autoExpand" type="bool" desc="Journal auto-expand enabled"/> + <!--arg name="autoExpand" type="bool" desc="Journal auto-expand enabled"/--> <arg name="fileSize" type="uint32" desc="Journal file size in bytes"/> <arg name="jrnlId" type="sstr" desc="Journal Id"/> <arg name="numEnq" type="uint32" desc="Number of recovered enqueues"/> diff --git a/qpid/tools/src/py/linearstore/efptool.py b/qpid/tools/src/py/linearstore/efptool.py index 380d244c91..702f496d62 100755 --- a/qpid/tools/src/py/linearstore/efptool.py +++ b/qpid/tools/src/py/linearstore/efptool.py @@ -30,6 +30,8 @@ DEFAULT_SBLK_SIZE = 4096 # 32 dblks DEFAULT_SBLK_SIZE_KB = DEFAULT_SBLK_SIZE / 1024 DEFAULT_EFP_DIR_NAME = 'efp' DEFAULT_JRNL_EXTENTION = '.jrnl' +DEFAULT_FHDR_MAGIC = 'QLSf' +DEFAULT_JRNL_VERSION = 2 def get_directory_size(directory_name): ''' Decode the directory name in the format NNNk to a numeric size, where NNN is a number string ''' @@ -52,7 +54,7 @@ class InvalidPartitionDirectoryError(EfpToolError): class Header: ''' Abstract class for encoding the initial part of the header struct which is common across all journal headers ''' - FORMAT = '=4sBBHQ' + FORMAT = '<4s2HQ' HDR_VER = 2 def __init__(self, magic, hdr_ver, flags, rid): self.magic = magic @@ -64,26 +66,27 @@ class Header: return self.magic == r'\0'*4 def encode(self): ''' Encode the members of this struct to a binary string for writing to disk ''' - return pack(Header.FORMAT, self.magic, self.hdr_ver, 0xff, self.flags, self.rid) + return pack(Header.FORMAT, self.magic, self.hdr_ver, self.flags, self.rid) class FileHeader(Header): ''' Class for encoding journal file headers to disk ''' - FORMAT = '=3Q2L2QH' - def __init__(self, magic, hdr_ver, flags, rid, fro, ts_sec, ts_ns, file_cnt, file_size, file_number, file_name): + FORMAT = '<2HL5QH' + def __init__(self, magic, hdr_ver, flags, rid, fhdr_size_sblks, efp_partition, file_size_kib, fro, ts_sec, ts_ns, file_number, queue_name): Header.__init__(self, magic, hdr_ver, flags, rid) + self.fhdr_size_sblks = fhdr_size_sblks + self.efp_partition = efp_partition + self.file_size_kib = file_size_kib self.fro = fro self.ts_sec = ts_sec self.ts_ns = ts_ns - self.file_cnt = file_cnt - self.file_size = file_size self.file_number = file_number - self.file_name = file_name + self.queue_name = queue_name def encode(self): ''' Encode the members of this struct to a binary string for writing to disk ''' - return Header.encode(self) + pack(FileHeader.FORMAT, self.fro, self.ts_sec, self.ts_ns, self.file_cnt, - 0xffffffff, self.file_size, self.file_number, - len(self.file_name)) + self.file_name + return Header.encode(self) + pack(FileHeader.FORMAT, self.fhdr_size_sblks, self.efp_partition.partition_num, + 0xffffffff, self.file_size_kib, self.fro, self.ts_sec, self.ts_ns, + self.file_number, len(self.queue_name)) + self.queue_name class EfpArgParser(argparse.ArgumentParser): @@ -214,7 +217,7 @@ class EmptyFilePool: def create_new_efp_file(self): ''' Create a single new empty journal file of the prescribed size for this EFP ''' file_name = str(uuid4()) + DEFAULT_JRNL_EXTENTION - file_header = FileHeader(r'\0\0\0\0', 0, 0, 0, 0, 0, 0, 0, self.file_size_kb, 0, file_name) + file_header = FileHeader(DEFAULT_FHDR_MAGIC, DEFAULT_JRNL_VERSION, 0, 0, 1, self.partition, self.file_size_kb, 0, 0, 0, 0, '') efh = file_header.encode() efh_bytes = len(efh) f = open(os.path.join(self.directory, file_name), 'wb') @@ -258,11 +261,14 @@ class Partition: return s def validate_efp_directory(self): ''' Check that the partition directory is valid ''' - if os.path.exists(self.efp_directory): - if not os.path.isdir(self.efp_directory): - raise InvalidPartitionDirectoryError + if os.path.isdir(self.root_path): + if os.path.exists(self.efp_directory): + if not os.path.isdir(self.efp_directory): + raise InvalidPartitionDirectoryError + else: + os.mkdir(self.efp_directory) else: - os.mkdir(self.efp_directory) + raise InvalidPartitionDirectoryError # TODO: Add checks for permissions to write and sufficient space def read(self): ''' Read the partition, identifying EFP directories. Read each EFP directory found. ''' |