summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-09-20 16:43:10 +0000
committerKim van der Riet <kpvdr@apache.org>2013-09-20 16:43:10 +0000
commitfcdf1723c7b5cdf0772054a93edb6e7d97c4bb1e (patch)
tree14fcd6191b11450f38cf88caa8dd9e44bbc5555b /qpid/cpp/src/qpid
parent7defd7eb7b11a0ff83ee0a7393477a453f4fb604 (diff)
downloadqpid-python-fcdf1723c7b5cdf0772054a93edb6e7d97c4bb1e.tar.gz
QPID-4984: WIP - compiles, but not functional.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1525050 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r--qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp59
-rw-r--r--qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.h40
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.cpp72
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.h47
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp690
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h41
-rw-r--r--qpid/cpp/src/qpid/linearstore/QpidLog.h (renamed from qpid/cpp/src/qpid/linearstore/Log.h)0
-rw-r--r--qpid/cpp/src/qpid/linearstore/StorePlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp255
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h85
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp175
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h63
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp134
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h72
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h37
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp54
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h46
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp142
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h68
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp62
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h56
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/enums.h12
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h5
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp53
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h24
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp76
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jdir.h29
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp102
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h7
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c57
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h46
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp38
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/management-schema.xml26
36 files changed, 2139 insertions, 548 deletions
diff --git a/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp b/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp
index 3a11817d1e..204affd1d1 100644
--- a/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp
+++ b/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp
@@ -72,7 +72,7 @@ rmgr::initialize(aio_callback* const cbp)
throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize");
}
_fhdr_aio_cb_ptr = new aio_cb;
- std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb*));
+ std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb));
}
void
diff --git a/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp
new file mode 100644
index 0000000000..37e3922846
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "EmptyFilePoolManagerImpl.h"
+
+#include "QpidLog.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
+
+namespace qpid {
+namespace linearstore {
+
+EmptyFilePoolManagerImpl::EmptyFilePoolManagerImpl(const std::string& qlsStorePath) :
+ qpid::qls_jrnl::EmptyFilePoolManager(qlsStorePath)
+{}
+
+EmptyFilePoolManagerImpl::~EmptyFilePoolManagerImpl() {}
+
+void EmptyFilePoolManagerImpl::findEfpPartitions() {
+ qpid::qls_jrnl::EmptyFilePoolManager::findEfpPartitions();
+ QLS_LOG(info, "EFP Manager initialization complete");
+ std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*> partitionList;
+ std::vector<qpid::qls_jrnl::EmptyFilePool*> filePoolList;
+ getEfpPartitions(partitionList);
+ if (partitionList.size() == 0) {
+ QLS_LOG(error, "NO EFP PARTITIONS FOUND! No queue creation is possible.")
+ } else {
+ QLS_LOG(info, "> EFP Partitions found: " << partitionList.size());
+ for (std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) {
+ filePoolList.clear();
+ (*i)->getEmptyFilePools(filePoolList);
+ QLS_LOG(info, " * Partition " << (*i)->partitionNumber() << " containing " << filePoolList.size() << " pool" <<
+ (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->partitionDirectory() << "\'");
+ for (std::vector<qpid::qls_jrnl::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
+ QLS_LOG(info, " - EFP \'" << (*j)->fileSizeKib() << "k\' containing " << (*j)->numEmptyFiles() <<
+ " files of size " << (*j)->fileSizeKib() << " KiB totaling " << (*j)->cumFileSizeKib() << " KiB");
+ }
+ }
+ }
+}
+
+}} /* namespace qpid::linearstore */
diff --git a/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.h b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.h
new file mode 100644
index 0000000000..f9087b2fbc
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.h
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_EMPTYFILEPOOLMANAGERIMPL_H_
+#define QPID_LINEARSTORE_EMPTYFILEPOOLMANAGERIMPL_H_
+
+#include "qpid/linearstore/jrnl/EmptyFilePoolManager.h"
+
+namespace qpid {
+namespace linearstore {
+
+class EmptyFilePoolManagerImpl: public qpid::qls_jrnl::EmptyFilePoolManager
+{
+public:
+ EmptyFilePoolManagerImpl(const std::string& qlsStorePath);
+ virtual ~EmptyFilePoolManagerImpl();
+ void findEfpPartitions();
+};
+
+}} /* namespace qpid::linearstore */
+
+#endif /* QPID_LINEARSTORE_EMPTYFILEPOOLMANAGERIMPL_H_ */
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
index f8b7777269..3fdfb24592 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
@@ -23,16 +23,17 @@
#include "qpid/linearstore/jrnl/jerrno.h"
#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/EmptyFilePool.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
-#include "qmf/org/apache/qpid/linearstore/ArgsJournalExpand.h"
+//#include "qmf/org/apache/qpid/linearstore/ArgsJournalExpand.h"
#include "qmf/org/apache/qpid/linearstore/EventCreated.h"
#include "qmf/org/apache/qpid/linearstore/EventEnqThresholdExceeded.h"
#include "qmf/org/apache/qpid/linearstore/EventFull.h"
#include "qmf/org/apache/qpid/linearstore/EventRecovered.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Timer.h"
-#include "qpid/linearstore/Log.h"
+#include "qpid/linearstore/QpidLog.h"
#include "qpid/linearstore/StoreException.h"
using namespace qpid::qls_jrnl;
@@ -53,22 +54,23 @@ void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); i
JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
const std::string& journalId,
const std::string& journalDirectory,
- const std::string& journalBaseFilename,
+// const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout,
qpid::management::ManagementAgent* a,
DeleteCallback onDelete):
- jcntl(journalId, journalDirectory, journalBaseFilename),
+ jcntl(journalId, journalDirectory/*, journalBaseFilename*/),
timer(timer_),
getEventsTimerSetFlag(false),
- lastReadRid(0),
+ efpp(0),
+// lastReadRid(0),
writeActivityFlag(false),
flushTriggeredFlag(true),
- _xidp(0),
- _datap(0),
- _dlen(0),
- _dtok(),
- _external(false),
+// _xidp(0),
+// _datap(0),
+// _dlen(0),
+// _dtok(),
+// _external(false),
deleteCallback(onDelete)
{
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
@@ -82,7 +84,7 @@ JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
QLS_LOG2(notice, _jid, "Created");
std::ostringstream oss;
- oss << "Journal directory = \"" << journalDirectory << "\"; Base file name = \"" << journalBaseFilename << "\"";
+ oss << "Journal directory = \"" << journalDirectory << "\"";
QLS_LOG2(debug, _jid, oss.str());
}
@@ -95,7 +97,7 @@ JournalImpl::~JournalImpl()
}
getEventsFireEventsPtr->cancel();
inactivityFireEventPtr->cancel();
- free_read_buffers();
+// free_read_buffers();
if (_mgmtObject.get() != 0) {
_mgmtObject->resourceDestroy();
@@ -116,14 +118,14 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a)
_mgmtObject->set_name(_jid);
_mgmtObject->set_directory(_jdir.dirname());
- _mgmtObject->set_baseFileName(_base_filename);
+// _mgmtObject->set_baseFileName(_base_filename);
_mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
_mgmtObject->set_readPages(JRNL_RMGR_PAGES);
// The following will be set on initialize(), but being properties, these must be set to 0 in the meantime
- _mgmtObject->set_initialFileCount(0);
- _mgmtObject->set_dataFileSize(0);
- _mgmtObject->set_currentFileCount(0);
+ //_mgmtObject->set_initialFileCount(0);
+ //_mgmtObject->set_dataFileSize(0);
+ //_mgmtObject->set_currentFileCount(0);
_mgmtObject->set_writePageSize(0);
_mgmtObject->set_writePages(0);
@@ -133,22 +135,23 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a)
void
-JournalImpl::initialize(/*const uint16_t num_jfiles,
- const bool auto_expand,
- const uint16_t ae_max_jfiles,
- const uint32_t jfsize_sblks,*/
+JournalImpl::initialize(qpid::qls_jrnl::EmptyFilePool* efpp_,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
qpid::qls_jrnl::aio_callback* const cbp)
{
- std::ostringstream oss;
-// oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
- oss << "Initialize;";
- oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
- oss << " wcache_num_pages=" << wcache_num_pages;
- QLS_LOG2(debug, _jid, oss.str());
- jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, cbp);
- QLS_LOG2(debug, _jid, "Initialization complete");
+ efpp = efpp_;
+// efpp->createJournal(_jdir);
+// QLS_LOG2(notice, _jid, "Initialized");
+// std::ostringstream oss;
+//// oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
+// oss << "Initialize; efpPartitionNumber=" << efpp_->getPartitionNumber();
+// oss << " efpFileSizeKb=" << efpp_->fileSizeKib();
+// oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
+// oss << " wcache_num_pages=" << wcache_num_pages;
+// QLS_LOG2(debug, _jid, oss.str());
+ jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp, wcache_num_pages, wcache_pgsize_sblks, cbp);
+// QLS_LOG2(debug, _jid, "Initialization complete");
// TODO: replace for linearstore: _lpmgr
/*
if (_mgmtObject.get() != 0)
@@ -261,6 +264,7 @@ JournalImpl::recover_complete()
//#define AIO_SLEEP_TIME_US 10 // 0.01 ms
// Return true if content is recovered from store; false if content is external and must be recovered from an external store.
// Throw exception for all errors.
+/*
bool
JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset)
{
@@ -351,6 +355,7 @@ JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size
}
return true;
}
+*/
void
JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
@@ -574,6 +579,7 @@ void
JournalImpl::rd_aio_cb(std::vector<uint16_t>& /*pil*/)
{}
+/*
void
JournalImpl::free_read_buffers()
{
@@ -586,6 +592,12 @@ JournalImpl::free_read_buffers()
_datap = 0;
}
}
+*/
+
+void
+JournalImpl::createStore() {
+
+}
void
JournalImpl::handleIoResult(const iores r)
@@ -624,12 +636,13 @@ JournalImpl::handleIoResult(const iores r)
}
}
-qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t methodId,
+qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/,
qpid::management::Args& /*args*/,
std::string& /*text*/)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+/*
switch (methodId)
{
case _qmf::Journal::METHOD_EXPAND :
@@ -640,6 +653,7 @@ qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t m
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
}
+*/
return status;
}
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
index 692bccc9b0..264162e5bb 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
@@ -19,11 +19,12 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JOURNALIMPL_H
-#define QPID_LEGACYSTORE_JOURNALIMPL_H
+#ifndef QPID_LINEARSTORE_JOURNALIMPL_H
+#define QPID_LINEARSTORE_JOURNALIMPL_H
#include <set>
#include "qpid/linearstore/jrnl/enums.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
#include "qpid/linearstore/jrnl/jcntl.h"
#include "qpid/linearstore/DataTokenImpl.h"
#include "qpid/linearstore/PreparedTransaction.h"
@@ -35,12 +36,15 @@
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/linearstore/Journal.h"
-namespace qpid {
+namespace qpid{
+
namespace sys {
class Timer;
-}}
+}
+namespace qls_jrnl {
+class EmptyFilePool;
+}
-namespace qpid{
namespace linearstore{
class JournalImpl;
@@ -83,20 +87,21 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
qpid::sys::Mutex _getf_lock;
qpid::sys::Mutex _read_lock;
+ qpid::qls_jrnl::EmptyFilePool* efpp;
- uint64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
- std::vector<uint64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
+// uint64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
+// std::vector<uint64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
bool writeActivityFlag;
bool flushTriggeredFlag;
boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
// temp local vars for loadMsgContent below
- void* _xidp;
- void* _datap;
- size_t _dlen;
- qpid::qls_jrnl::data_tok _dtok;
- bool _external;
+// void* _xidp;
+// void* _datap;
+// size_t _dlen;
+// qpid::qls_jrnl::data_tok _dtok;
+// bool _external;
qpid::management::ManagementAgent* _agent;
qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject;
@@ -107,7 +112,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
JournalImpl(qpid::sys::Timer& timer,
const std::string& journalId,
const std::string& journalDirectory,
- const std::string& journalBaseFilename,
+// const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout,
qpid::management::ManagementAgent* agent,
@@ -121,6 +126,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
const bool auto_expand,
const uint16_t ae_max_jfiles,
const uint32_t jfsize_sblks,*/
+ qpid::qls_jrnl::EmptyFilePool* efp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
qpid::qls_jrnl::aio_callback* const cbp);
@@ -129,10 +135,10 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
const bool auto_expand,
const uint16_t ae_max_jfiles,
const uint32_t jfsize_sblks,*/
+ qpid::qls_jrnl::EmptyFilePool* efp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks) {
- initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks,
- this);
+ initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efp, wcache_num_pages, wcache_pgsize_sblks, this);
}
void recover(/*const uint16_t num_jfiles,
@@ -164,7 +170,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
// Temporary fn to read and save last msg read from journal so it can be assigned
// in chunks. To be replaced when coding to do this direct from the journal is ready.
// Returns true if the record is extern, false if local.
- bool loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset = 0);
+// bool loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset = 0);
// Overrides for write inactivity timer
void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
@@ -216,7 +222,8 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
private:
- void free_read_buffers();
+// void free_read_buffers();
+ void createStore();
inline void setGetEventTimer()
{
@@ -242,11 +249,11 @@ class TplJournalImpl : public JournalImpl
TplJournalImpl(qpid::sys::Timer& timer,
const std::string& journalId,
const std::string& journalDirectory,
- const std::string& journalBaseFilename,
+// const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout,
qpid::management::ManagementAgent* agent) :
- JournalImpl(timer, journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
+ JournalImpl(timer, journalId, journalDirectory/*, journalBaseFilename*/, getEventsTimeout, flushTimeout, agent)
{}
virtual ~TplJournalImpl() {}
@@ -263,4 +270,4 @@ class TplJournalImpl : public JournalImpl
} // namespace msgstore
} // namespace mrg
-#endif // ifndef QPID_LEGACYSTORE_JOURNALIMPL_H
+#endif // ifndef QPID_LINEARSTORE_JOURNALIMPL_H
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index f4b9a40455..5372963bc2 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -21,17 +21,19 @@
#include "qpid/linearstore/MessageStoreImpl.h"
+#include "db-inc.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/linearstore/BindingDbt.h"
#include "qpid/linearstore/BufferValue.h"
#include "qpid/linearstore/IdDbt.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolManager.h"
#include "qpid/linearstore/jrnl/txn_map.h"
-#include "qpid/linearstore/Log.h"
+#include "qpid/linearstore/QpidLog.h"
#include "qpid/framing/FieldValue.h"
#include "qmf/org/apache/qpid/linearstore/Package.h"
#include "qpid/linearstore/StoreException.h"
#include <dirent.h>
-#include <db.h>
+
#define MAX_AIO_SLEEPS 100000 // tot: ~1 sec
#define AIO_SLEEP_TIME_US 10 // 0.01 ms
@@ -43,53 +45,46 @@ namespace linearstore{
const std::string MessageStoreImpl::storeTopLevelDir("qls"); // Sets the top-level store dir name
+
// FIXME aconway 2010-03-09: was 10
qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::TIME_MSEC); // 10ms
qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
-MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t _rid,
- const bool _deq_flag,
- const bool _commit_flag,
- const bool _tpc_flag) :
- rid(_rid),
- deq_flag(_deq_flag),
- commit_flag(_commit_flag),
- tpc_flag(_tpc_flag)
+MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t rid_,
+ const bool deq_flag_,
+ const bool commit_flag_,
+ const bool tpc_flag_) :
+ rid(rid_),
+ deq_flag(deq_flag_),
+ commit_flag(commit_flag_),
+ tpc_flag(tpc_flag_)
{}
-MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath) :
- numJrnlFiles(0),
- autoJrnlExpand(false),
- autoJrnlExpandMaxFiles(0),
- jrnlFsizeSblks(0),
+MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) :
+ defaultEfpPartitionNumber(0),
+ defaultEfpFileSizeKib(0),
truncateFlag(false),
wCachePgSizeSblks(0),
wCacheNumPages(0),
- tplNumJrnlFiles(0),
- tplJrnlFsizeSblks(0),
tplWCachePgSizeSblks(0),
tplWCacheNumPages(0),
highestRid(0),
isInit(false),
- envPath(envpath),
+ envPath(envpath_),
broker(broker_),
mgmtObject(),
agent(0)
{}
-uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param, const std::string paramName/*, const uint16_t jrnlFsizePgs*/)
+uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param_, const std::string& paramName_)
{
- uint32_t p = param;
+ uint32_t p = param_;
-/* if (jrnlFsizePgs == 1 && p > 64 ) {
- p = 64;
- QLS_LOG(warning, "parameter " << paramName << " (" << param << ") cannot set a page size greater than the journal file size; changing this parameter to the journal file size (" << p << ")");
- }
- else*/ if (p == 0) {
+ if (p == 0) {
// For zero value, use default
p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024;
- QLS_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
+ QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
} else if ( p > 128 || (p & (p-1)) ) {
// For any positive value that is not a power of 2, use closest value
if (p < 6) p = 4;
@@ -98,16 +93,16 @@ uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param, const st
else if (p < 48) p = 32;
else if (p < 96) p = 64;
else p = 128;
- QLS_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")");
+ QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")");
}
return p;
}
-uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib)
+uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_)
{
- uint32_t wrPageSizeSblks = wrPageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ uint32_t wrPageSizeSblks = wrPageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
uint32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
- switch (wrPageSizeKib)
+ switch (wrPageSizeKib_)
{
case 1:
case 2:
@@ -124,6 +119,29 @@ uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib)
}
}
+qpid::qls_jrnl::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition_,
+ const std::string& /*paramName_*/) {
+ // TODO: check against list of existing partitions, throw if not found
+ return partition_;
+}
+
+qpid::qls_jrnl::efpFileSizeKib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_,
+ const std::string& paramName_) {
+ uint8_t rem = efpFileSizeKib_ % uint64_t(JRNL_SBLK_SIZE_KIB);
+ if (rem != 0) {
+ uint64_t newVal = efpFileSizeKib_ - rem;
+ if (rem >= (JRNL_SBLK_SIZE_KIB / 2))
+ newVal += JRNL_SBLK_SIZE_KIB;
+ QLS_LOG(warning, "Parameter " << paramName_ << " (" << efpFileSizeKib_ << ") must be a multiple of " <<
+ JRNL_SBLK_SIZE_KIB << "; changing this parameter to the closest allowable value (" <<
+ newVal << ")");
+ return newVal;
+ }
+ return efpFileSizeKib_;
+
+ // TODO: check against list of existing pools in the given partition
+}
+
void MessageStoreImpl::initManagement ()
{
if (broker != 0) {
@@ -134,15 +152,10 @@ void MessageStoreImpl::initManagement ()
new _qmf::Store(agent, this, broker));
mgmtObject->set_location(storeDir);
- mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
- mgmtObject->set_defaultDataFileSize(jrnlFsizeSblks / JRNL_RMGR_PAGE_SIZE);
mgmtObject->set_tplIsInitialized(false);
mgmtObject->set_tplDirectory(getTplBaseDir());
mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE);
mgmtObject->set_tplWritePages(tplWCacheNumPages);
- mgmtObject->set_tplInitialFileCount(tplNumJrnlFiles);
- mgmtObject->set_tplDataFileSize(tplJrnlFsizeSblks * JRNL_SBLK_SIZE);
- mgmtObject->set_tplCurrentFileCount(tplNumJrnlFiles);
agent->addObject(mgmtObject, 0, true);
@@ -154,61 +167,58 @@ void MessageStoreImpl::initManagement ()
}
}
-bool MessageStoreImpl::init(const qpid::Options* options)
+bool MessageStoreImpl::init(const qpid::Options* options_)
{
// Extract and check options
- const StoreOptions* opts = static_cast<const StoreOptions*>(options);
+ const StoreOptions* opts = static_cast<const StoreOptions*>(options_);
+ qpid::qls_jrnl::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition");
+ qpid::qls_jrnl::efpFileSizeKib_t efpFilePoolSize = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size");
uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size");
uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size");
// Pass option values to init()
- return init(opts->storeDir, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib);
+ return init(opts->storeDir, efpPartition, efpFilePoolSize, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib);
}
// These params, taken from options, are assumed to be correct and verified
-bool MessageStoreImpl::init(const std::string& dir,
+bool MessageStoreImpl::init(const std::string& storeDir_,
/*uint16_t jfiles,
uint32_t jfileSizePgs,*/
- const bool truncateFlag,
- uint32_t wCachePageSizeKib,
+ qpid::qls_jrnl::efpPartitionNumber_t efpPartition_,
+ qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_,
+ const bool truncateFlag_,
+ uint32_t wCachePageSizeKib_,
/*uint16_t tplJfiles,
uint32_t tplJfileSizePgs,*/
- uint32_t tplWCachePageSizeKib/*,
- bool autoJExpand,
- uint16_t autoJExpandMaxFiles*/)
+ uint32_t tplWCachePageSizeKib_)
+ /*bool autoJExpand,
+ uint16_t autoJExpandMaxFiles)*/
{
if (isInit) return true;
// Set geometry members (converting to correct units where req'd)
-// numJrnlFiles = jfiles;
-// jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE;
- wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
- wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib);
-// tplNumJrnlFiles = tplJfiles;
-// tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE;
- tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
- tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib);
-// autoJrnlExpand = autoJExpand;
-// autoJrnlExpandMaxFiles = autoJExpandMaxFiles;
- if (dir.size()>0) storeDir = dir;
-
- if (truncateFlag)
- truncateInit(false);
+ defaultEfpPartitionNumber = efpPartition_;
+ defaultEfpFileSizeKib = efpFileSizeKib_;
+ wCachePgSizeSblks = wCachePageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib_);
+ tplWCachePgSizeSblks = tplWCachePageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib_);
+ if (storeDir_.size()>0) storeDir = storeDir_;
+
+ if (truncateFlag_)
+ truncateInit();
else
init();
- QLS_LOG(notice, "Store module initialized; store-dir=" << dir);
-// QLS_LOG(info, "> Default files per journal: " << jfiles);
-// TODO: Uncomment these lines when auto-expand is enabled.
-// QLS_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled"));
-// if (autoJrnlExpand) QLS_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles);
-// QLS_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)");
- QLS_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (KiB)");
+ QLS_LOG(notice, "Store module initialized; store-dir=" << storeDir_);
+ QLS_LOG(info, "> Default EFP partition: " << defaultEfpPartitionNumber);
+ QLS_LOG(info, "> Default EFP file size: " << defaultEfpFileSizeKib << " (KiB)");
+ QLS_LOG(info, "> Default write cache page size: " << wCachePageSizeKib_ << " (KiB)");
QLS_LOG(info, "> Default number of write cache pages: " << wCacheNumPages);
-// QLS_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles);
-// QLS_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)");
- QLS_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (KiB)");
+ QLS_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib_ << " (KiB)");
QLS_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages);
+ QLS_LOG(info, "> EFP partition: " << defaultEfpPartitionNumber);
+ QLS_LOG(info, "> EFP file size pool: " << defaultEfpFileSizeKib << " (KiB)");
return isInit;
}
@@ -263,7 +273,7 @@ void MessageStoreImpl::init()
// NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure.
// However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the
// TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026
- tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
+ tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), /*"tpl",*/ defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
isInit = true;
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
@@ -286,6 +296,9 @@ void MessageStoreImpl::init()
throw;
}
} while (!isInit);
+
+ efpMgr.reset(new EmptyFilePoolManagerImpl(getStoreTopLevelDir()));
+ efpMgr->findEfpPartitions();
}
void MessageStoreImpl::finalize()
@@ -307,7 +320,7 @@ void MessageStoreImpl::finalize()
}
}
-void MessageStoreImpl::truncateInit(const bool saveStoreContent)
+void MessageStoreImpl::truncateInit()
{
if (isInit) {
{
@@ -324,15 +337,10 @@ void MessageStoreImpl::truncateInit(const bool saveStoreContent)
dbenv->close(0);
isInit = false;
}
- std::ostringstream oss;
- oss << storeDir << "/" << storeTopLevelDir;
- if (saveStoreContent) {
- std::string dir = qpid::qls_jrnl::jdir::push_down(storeDir, storeTopLevelDir, "cluster");
- QLS_LOG(notice, "Store directory " << oss.str() << " was pushed down (saved) into directory " << dir << ".");
- } else {
- qpid::qls_jrnl::jdir::delete_dir(oss.str().c_str());
- QLS_LOG(notice, "Store directory " << oss.str() << " was truncated.");
- }
+ qpid::qls_jrnl::jdir::delete_dir(getBdbBaseDir());
+ qpid::qls_jrnl::jdir::delete_dir(getJrnlBaseDir());
+ qpid::qls_jrnl::jdir::delete_dir(getTplBaseDir());
+ QLS_LOG(notice, "Store directory " << getStoreTopLevelDir() << " was truncated.");
init();
}
@@ -342,18 +350,18 @@ void MessageStoreImpl::chkTplStoreInit()
qpid::sys::Mutex::ScopedLock sl(tplInitLock);
if (!tplStorePtr->is_ready()) {
qpid::qls_jrnl::jdir::create_dir(getTplBaseDir());
- tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ tplWCacheNumPages, tplWCachePgSizeSblks);
+ tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSizeKib), tplWCacheNumPages, tplWCachePgSizeSblks);
if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
}
}
-void MessageStoreImpl::open(db_ptr db,
- DbTxn* txn,
- const char* file,
- bool dupKey)
+void MessageStoreImpl::open(db_ptr db_,
+ DbTxn* txn_,
+ const char* file_,
+ bool dupKey_)
{
- if(dupKey) db->set_flags(DB_DUPSORT);
- db->open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
+ if(dupKey_) db_->set_flags(DB_DUPSORT);
+ db_->open(txn_, file_, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
}
void MessageStoreImpl::closeDbs()
@@ -385,15 +393,15 @@ MessageStoreImpl::~MessageStoreImpl()
}
}
-void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue,
- const qpid::framing::FieldTable& /*args*/)
+void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_,
+ const qpid::framing::FieldTable& args_)
{
+ QLS_LOG(info, "*** MessageStoreImpl::create() queue=\"" << queue_.getName() << "\""); // DEBUG
checkInit();
- if (queue.getPersistenceId()) {
- THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
+ if (queue_.getPersistenceId()) {
+ THROW_STORE_EXCEPTION("Queue already created: " + queue_.getName());
}
JournalImpl* jQueue = 0;
- qpid::framing::FieldTable::ValuePtr value;
// uint16_t localFileCount = numJrnlFiles;
// bool localAutoExpandFlag = autoJrnlExpand;
@@ -408,18 +416,18 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue,
// if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
// localFileSizeSblks = chkJrnlFileSizeParam((uint32_t) value->get<int>(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE;
- if (queue.getName().size() == 0)
+ if (queue_.getName().size() == 0)
{
QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue.");
return;
}
- jQueue = new JournalImpl(broker->getTimer(), queue.getName(), getJrnlDir(queue), std::string("JournalData"),
+ jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_), /*std::string("JournalData"),*/
defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
- journalList[queue.getName()]=jQueue;
+ journalList[queue_.getName()]=jQueue;
}
// value = args.get("qpid.auto_expand");
@@ -429,25 +437,71 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue,
// value = args.get("qpid.auto_expand_max_jfiles");
// if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
// localAutoExpandMaxFileCount = (uint16_t) value->get<int>();
+/*
+ qpid::framing::FieldTable::ValuePtr value;
+ qpid::qls_jrnl::efpPartitionNumber_t localEfpPartition = efpPartition;
+ value = args_.get("qpid.efp_partition");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
+ localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition");
+ }
+
+ qpid::qls_jrnl::efpFileSizeKib_t localEfpFileSizeKib = efpFileSizeKib;
+ value = args_.get("qpid.efp_file_size");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
+ localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" );
+ }
+*/
- queue.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
+ queue_.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
try {
// init will create the deque's for the init...
- jQueue->initialize(/*localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks,*/ wCacheNumPages, wCachePgSizeSblks);
+// jQueue->initialize(localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks);
+ jQueue->initialize(getEmptyFilePool(args_), wCacheNumPages, wCachePgSizeSblks);
} catch (const qpid::qls_jrnl::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": create() failed: " + e.what());
}
try {
- if (!create(queueDb, queueIdSequence, queue)) {
- THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
+ if (!create(queueDb, queueIdSequence, queue_)) {
+ THROW_STORE_EXCEPTION("Queue already exists: " + queue_.getName());
}
} catch (const DbException& e) {
- THROW_STORE_EXCEPTION_2("Error creating queue named " + queue.getName(), e);
+ THROW_STORE_EXCEPTION_2("Error creating queue named " + queue_.getName(), e);
+ }
+}
+
+qpid::qls_jrnl::EmptyFilePool*
+MessageStoreImpl::getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t efpPartitionNumber_,
+ const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_) {
+ qpid::qls_jrnl::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_);
+ if (efpp == 0) {
+ std::ostringstream oss;
+ oss << "Partition=" << efpPartitionNumber_ << "; EfpFileSize=" << efpFileSizeKib_ << " KiB";
+ throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR_EFP_NOEFP, oss.str(), "MessageStoreImpl", "getEmptyFilePool");
}
+ return efpp;
}
-void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue)
+qpid::qls_jrnl::EmptyFilePool*
+MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) {
+ qpid::framing::FieldTable::ValuePtr value;
+ qpid::qls_jrnl::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber;
+ value = args_.get("qpid.efp_partition");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
+ localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition");
+ }
+
+ qpid::qls_jrnl::efpFileSizeKib_t localEfpFileSizeKib = defaultEfpFileSizeKib;
+ value = args_.get("qpid.efp_file_size");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
+ localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" );
+ }
+ return getEmptyFilePool(localEfpPartition, localEfpFileSizeKib);
+}
+
+void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue_)
{
+ QLS_LOG(info, "*** MessageStoreImpl::destroy() queue=\"" << queue_.getName() << "\"");
+/*
checkInit();
destroy(queueDb, queue);
deleteBindingsForQueue(queue);
@@ -461,21 +515,22 @@ void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue)
journalList.erase(queue.getName());
}
}
+*/
}
-void MessageStoreImpl::create(const qpid::broker::PersistableExchange& exchange,
- const qpid::framing::FieldTable& /*args*/)
+void MessageStoreImpl::create(const qpid::broker::PersistableExchange& exchange_,
+ const qpid::framing::FieldTable& /*args_*/)
{
checkInit();
- if (exchange.getPersistenceId()) {
- THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
+ if (exchange_.getPersistenceId()) {
+ THROW_STORE_EXCEPTION("Exchange already created: " + exchange_.getName());
}
try {
- if (!create(exchangeDb, exchangeIdSequence, exchange)) {
- THROW_STORE_EXCEPTION("Exchange already exists: " + exchange.getName());
+ if (!create(exchangeDb, exchangeIdSequence, exchange_)) {
+ THROW_STORE_EXCEPTION("Exchange already exists: " + exchange_.getName());
}
} catch (const DbException& e) {
- THROW_STORE_EXCEPTION_2("Error creating exchange named " + exchange.getName(), e);
+ THROW_STORE_EXCEPTION_2("Error creating exchange named " + exchange_.getName(), e);
}
}
@@ -488,14 +543,14 @@ void MessageStoreImpl::destroy(const qpid::broker::PersistableExchange& exchange
bindingDb->del(0, &key, DB_AUTO_COMMIT);
}
-void MessageStoreImpl::create(const qpid::broker::PersistableConfig& general)
+void MessageStoreImpl::create(const qpid::broker::PersistableConfig& general_)
{
checkInit();
- if (general.getPersistenceId()) {
+ if (general_.getPersistenceId()) {
THROW_STORE_EXCEPTION("General configuration item already created");
}
try {
- if (!create(generalDb, generalIdSequence, general)) {
+ if (!create(generalDb, generalIdSequence, general_)) {
THROW_STORE_EXCEPTION("General configuration already exists");
}
} catch (const DbException& e) {
@@ -503,25 +558,25 @@ void MessageStoreImpl::create(const qpid::broker::PersistableConfig& general)
}
}
-void MessageStoreImpl::destroy(const qpid::broker::PersistableConfig& general)
+void MessageStoreImpl::destroy(const qpid::broker::PersistableConfig& general_)
{
checkInit();
- destroy(generalDb, general);
+ destroy(generalDb, general_);
}
-bool MessageStoreImpl::create(db_ptr db,
- IdSequence& seq,
- const qpid::broker::Persistable& p)
+bool MessageStoreImpl::create(db_ptr db_,
+ IdSequence& seq_,
+ const qpid::broker::Persistable& p_)
{
- uint64_t id (seq.next());
+ uint64_t id (seq_.next());
Dbt key(&id, sizeof(id));
- BufferValue value (p);
+ BufferValue value (p_);
int status;
TxnCtxt txn;
txn.begin(dbenv.get(), true);
try {
- status = db->put(txn.get(), &key, &value, DB_NOOVERWRITE);
+ status = db_->put(txn.get(), &key, &value, DB_NOOVERWRITE);
txn.commit();
} catch (...) {
txn.abort();
@@ -530,27 +585,27 @@ bool MessageStoreImpl::create(db_ptr db,
if (status == DB_KEYEXIST) {
return false;
} else {
- p.setPersistenceId(id);
+ p_.setPersistenceId(id);
return true;
}
}
-void MessageStoreImpl::destroy(db_ptr db, const qpid::broker::Persistable& p)
+void MessageStoreImpl::destroy(db_ptr db, const qpid::broker::Persistable& p_)
{
qpid::sys::Mutex::ScopedLock sl(bdbLock);
- IdDbt key(p.getPersistenceId());
+ IdDbt key(p_.getPersistenceId());
db->del(0, &key, DB_AUTO_COMMIT);
}
-void MessageStoreImpl::bind(const qpid::broker::PersistableExchange& e,
- const qpid::broker::PersistableQueue& q,
- const std::string& k,
- const qpid::framing::FieldTable& a)
+void MessageStoreImpl::bind(const qpid::broker::PersistableExchange& e_,
+ const qpid::broker::PersistableQueue& q_,
+ const std::string& k_,
+ const qpid::framing::FieldTable& a_)
{
checkInit();
- IdDbt key(e.getPersistenceId());
- BindingDbt value(e, q, k, a);
+ IdDbt key(e_.getPersistenceId());
+ BindingDbt value(e_, q_, k_, a_);
TxnCtxt txn;
txn.begin(dbenv.get(), true);
try {
@@ -562,16 +617,16 @@ void MessageStoreImpl::bind(const qpid::broker::PersistableExchange& e,
}
}
-void MessageStoreImpl::unbind(const qpid::broker::PersistableExchange& e,
- const qpid::broker::PersistableQueue& q,
- const std::string& k,
- const qpid::framing::FieldTable&)
+void MessageStoreImpl::unbind(const qpid::broker::PersistableExchange& e_,
+ const qpid::broker::PersistableQueue& q_,
+ const std::string& k_,
+ const qpid::framing::FieldTable& /*a_*/)
{
checkInit();
- deleteBinding(e, q, k);
+ deleteBinding(e_, q_, k_);
}
-void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry)
+void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
{
checkInit();
txn_list prepared;
@@ -585,14 +640,14 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry)
txn.begin(dbenv.get(), false);
try {
//read all queues, calls recoversMessages
- recoverQueues(txn, registry, queues, prepared, messages);
+ recoverQueues(txn, registry_, queues, prepared, messages);
//recover exchange & bindings:
- recoverExchanges(txn, registry, exchanges);
+ recoverExchanges(txn, registry_, exchanges);
recoverBindings(txn, exchanges, queues);
//recover general-purpose configuration
- recoverGeneral(txn, registry);
+ recoverGeneral(txn, registry_);
txn.commit();
} catch (const DbException& e) {
@@ -629,7 +684,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry)
tpcc->prepare(tplStorePtr.get());
qpid::broker::RecoverableTransaction::shared_ptr dtx;
- if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn);
+ if (!incomplTplTxnFlag) dtx = registry_.recoverTransaction(xid, txn);
if (pt.enqueues.get()) {
for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
@@ -669,15 +724,17 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry)
}
}
}
- registry.recoveryComplete();
+ registry_.recoveryComplete();
}
-void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
- qpid::broker::RecoveryManager& registry,
- queue_index& queue_index,
- txn_list& prepared,
- message_index& messages)
+void MessageStoreImpl::recoverQueues(TxnCtxt& /*txn*/,
+ qpid::broker::RecoveryManager& /*registry*/,
+ queue_index& /*queue_index*/,
+ txn_list& /*prepared*/,
+ message_index& /*messages*/)
{
+ QLS_LOG(info, "*** MessageStoreImpl::recoverQueues()");
+/*
Cursor queues;
queues.open(queueDb, txn.get());
@@ -714,24 +771,22 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
long rcnt = 0L; // recovered msg count
long idcnt = 0L; // in-doubt msg count
uint64_t thisHighestRid = 0ULL;
- jQueue->recover(/*numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks,*/ wCacheNumPages,
+ jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages,
wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
-/*
- // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting
- // from recovery of a store that has had its size changed externally by the resize utility.
- // If so, update the queue store settings so that QMF queries will reflect the new values.
- const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings;
- qpid::framing::FieldTable::ValuePtr value;
- value = storeargs.get("qpid.file_count");
- if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint16_t)value->get<int>() != jQueue->num_jfiles()) {
- queue->addArgument("qpid.file_count", jQueue->num_jfiles());
- }
- value = storeargs.get("qpid.file_size");
- if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) {
- queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE);
- }
-*/
+// // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting
+// // from recovery of a store that has had its size changed externally by the resize utility.
+// // If so, update the queue store settings so that QMF queries will reflect the new values.
+// const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings;
+// qpid::framing::FieldTable::ValuePtr value;
+// value = storeargs.get("qpid.file_count");
+// if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint16_t)value->get<int>() != jQueue->num_jfiles()) {
+// queue->addArgument("qpid.file_count", jQueue->num_jfiles());
+// }
+// value = storeargs.get("qpid.file_size");
+// if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) {
+// queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE);
+// }
if (highestRid == 0ULL)
highestRid = thisHighestRid;
@@ -755,16 +810,17 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
QLS_LOG(info, "Most recent persistence id found: 0x" << std::hex << highestRid << std::dec);
queueIdSequence.reset(maxQueueId + 1);
+*/
}
-void MessageStoreImpl::recoverExchanges(TxnCtxt& txn,
- qpid::broker::RecoveryManager& registry,
- exchange_index& index)
+void MessageStoreImpl::recoverExchanges(TxnCtxt& txn_,
+ qpid::broker::RecoveryManager& registry_,
+ exchange_index& index_)
{
//TODO: this is a copy&paste from recoverQueues - refactor!
Cursor exchanges;
- exchanges.open(exchangeDb, txn.get());
+ exchanges.open(exchangeDb, txn_.get());
uint64_t maxExchangeId(1);
IdDbt key;
@@ -773,11 +829,11 @@ void MessageStoreImpl::recoverExchanges(TxnCtxt& txn,
while (exchanges.next(key, value)) {
qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
//create a Exchange instance
- qpid::broker::RecoverableExchange::shared_ptr exchange = registry.recoverExchange(buffer);
+ qpid::broker::RecoverableExchange::shared_ptr exchange = registry_.recoverExchange(buffer);
if (exchange) {
//set the persistenceId and update max as required
exchange->setPersistenceId(key.id);
- index[key.id] = exchange;
+ index_[key.id] = exchange;
QLS_LOG(info, "Recovered exchange \"" << exchange->getName() << '"');
}
maxExchangeId = std::max(key.id, maxExchangeId);
@@ -785,12 +841,12 @@ void MessageStoreImpl::recoverExchanges(TxnCtxt& txn,
exchangeIdSequence.reset(maxExchangeId + 1);
}
-void MessageStoreImpl::recoverBindings(TxnCtxt& txn,
- exchange_index& exchanges,
- queue_index& queues)
+void MessageStoreImpl::recoverBindings(TxnCtxt& txn_,
+ exchange_index& exchanges_,
+ queue_index& queues_)
{
Cursor bindings;
- bindings.open(bindingDb, txn.get());
+ bindings.open(bindingDb, txn_.get());
IdDbt key;
Dbt value;
@@ -807,9 +863,9 @@ void MessageStoreImpl::recoverBindings(TxnCtxt& txn,
buffer.getShortString(queueName);
buffer.getShortString(routingkey);
buffer.get(args);
- exchange_index::iterator exchange = exchanges.find(key.id);
- queue_index::iterator queue = queues.find(queueId);
- if (exchange != exchanges.end() && queue != queues.end()) {
+ exchange_index::iterator exchange = exchanges_.find(key.id);
+ queue_index::iterator queue = queues_.find(queueId);
+ if (exchange != exchanges_.end() && queue != queues_.end()) {
//could use the recoverable queue here rather than the name...
exchange->second->bind(queueName, routingkey, args);
QLS_LOG(info, "Recovered binding exchange=" << exchange->second->getName()
@@ -823,11 +879,11 @@ void MessageStoreImpl::recoverBindings(TxnCtxt& txn,
}
}
-void MessageStoreImpl::recoverGeneral(TxnCtxt& txn,
- qpid::broker::RecoveryManager& registry)
+void MessageStoreImpl::recoverGeneral(TxnCtxt& txn_,
+ qpid::broker::RecoveryManager& registry_)
{
Cursor items;
- items.open(generalDb, txn.get());
+ items.open(generalDb, txn_.get());
uint64_t maxGeneralId(1);
IdDbt key;
@@ -836,7 +892,7 @@ void MessageStoreImpl::recoverGeneral(TxnCtxt& txn,
while (items.next(key, value)) {
qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
//create instance
- qpid::broker::RecoverableConfig::shared_ptr config = registry.recoverConfig(buffer);
+ qpid::broker::RecoverableConfig::shared_ptr config = registry_.recoverConfig(buffer);
//set the persistenceId and update max as required
config->setPersistenceId(key.id);
maxGeneralId = std::max(key.id, maxGeneralId);
@@ -845,14 +901,16 @@ void MessageStoreImpl::recoverGeneral(TxnCtxt& txn,
}
void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
- qpid::broker::RecoveryManager& recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue,
- txn_list& prepared,
- message_index& messages,
- long& rcnt,
- long& idcnt)
+ qpid::broker::RecoveryManager& /*recovery*/,
+ qpid::broker::RecoverableQueue::shared_ptr& queue_,
+ txn_list& /*prepared*/,
+ message_index& /*messages*/,
+ long& /*rcnt*/,
+ long& /*idcnt*/)
{
- size_t preambleLength = sizeof(uint32_t)/*header size*/;
+ QLS_LOG(info, "*** MessageStoreImpl::recoverMessages() queue=\"" << queue_->getName() << "\"");
+/*
+ size_t preambleLength = sizeof(uint32_t)header size;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
DataTokenImpl dtok;
@@ -977,38 +1035,39 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
} catch (const qpid::qls_jrnl::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what());
}
+*/
}
qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& /*recovery*/,
- uint64_t /*messageId*/,
- unsigned& /*headerSize*/)
+ uint64_t /*messageId*/,
+ unsigned& /*headerSize*/)
{
throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage");
}
-int MessageStoreImpl::enqueueMessage(TxnCtxt& txn,
- IdDbt& msgId,
- qpid::broker::RecoverableMessage::shared_ptr& msg,
- queue_index& index,
- txn_list& prepared,
- message_index& messages)
+int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_,
+ IdDbt& msgId_,
+ qpid::broker::RecoverableMessage::shared_ptr& msg_,
+ queue_index& index_,
+ txn_list& prepared_,
+ message_index& messages_)
{
Cursor mappings;
- mappings.open(mappingDb, txn.get());
+ mappings.open(mappingDb, txn_.get());
IdDbt value;
int count(0);
- for (int status = mappings->get(&msgId, &value, DB_SET); status == 0; status = mappings->get(&msgId, &value, DB_NEXT_DUP)) {
- if (index.find(value.id) == index.end()) {
+ for (int status = mappings->get(&msgId_, &value, DB_SET); status == 0; status = mappings->get(&msgId_, &value, DB_NEXT_DUP)) {
+ if (index_.find(value.id) == index_.end()) {
QLS_LOG(warning, "Recovered message for queue that no longer exists");
mappings->del(0);
} else {
- qpid::broker::RecoverableQueue::shared_ptr queue = index[value.id];
- if (PreparedTransaction::isLocked(prepared, value.id, msgId.id)) {
- messages[msgId.id] = msg;
+ qpid::broker::RecoverableQueue::shared_ptr queue = index_[value.id];
+ if (PreparedTransaction::isLocked(prepared_, value.id, msgId_.id)) {
+ messages_[msgId_.id] = msg_;
} else {
- queue->recover(msg);
+ queue->recover(msg_);
}
count++;
}
@@ -1019,6 +1078,8 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn,
void MessageStoreImpl::readTplStore()
{
+ QLS_LOG(info, "*** MessageStoreImpl::readTplStore()");
+/*
tplRecoverMap.clear();
qpid::qls_jrnl::txn_map& tmap = tplStorePtr->get_txn_map();
DataTokenImpl dtok;
@@ -1087,13 +1148,16 @@ void MessageStoreImpl::readTplStore()
} catch (const qpid::qls_jrnl::jexception& e) {
THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
}
+*/
}
void MessageStoreImpl::recoverTplStore()
{
+ QLS_LOG(info, "*** MessageStoreImpl::recoverTplStore()");
+/*
if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
uint64_t thisHighestRid = 0ULL;
- tplStorePtr->recover(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+ tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
if (highestRid == 0ULL)
highestRid = thisHighestRid;
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
@@ -1104,10 +1168,13 @@ void MessageStoreImpl::recoverTplStore()
tplStorePtr->recover_complete(); // start journal.
}
+*/
}
-void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
+void MessageStoreImpl::recoverLockedMappings(txn_list& /*txns*/)
{
+ QLS_LOG(info, "*** MessageStoreImpl::recoverLockedMappings()");
+/*
if (!tplStorePtr->is_ready())
recoverTplStore();
@@ -1119,10 +1186,13 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
deq_ptr.reset(new LockedMappings);
txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr));
}
+*/
}
-void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
+void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/)
{
+ QLS_LOG(info, "*** MessageStoreImpl::collectPreparedXids()");
+/*
if (tplStorePtr->is_ready()) {
tplStorePtr->read_reset();
readTplStore();
@@ -1134,6 +1204,7 @@ void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
if (!i->second.deq_flag && i->second.tpc_flag)
xids.insert(i->first);
}
+*/
}
void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/)
@@ -1147,17 +1218,19 @@ void MessageStoreImpl::destroy(qpid::broker::PersistableMessage& /*msg*/)
}
void MessageStoreImpl::appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& /*msg*/,
- const std::string& /*data*/)
+ const std::string& /*data*/)
{
throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent");
}
-void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& queue,
- const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
- std::string& data,
- uint64_t offset,
- uint32_t length)
+void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
+ const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& /*msg*/,
+ std::string& /*data*/,
+ uint64_t /*offset*/,
+ uint32_t /*length*/)
{
+ throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent");
+/*
checkInit();
uint64_t messageId (msg->getPersistenceId());
@@ -1181,10 +1254,13 @@ void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& queue,
} else {
THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
+*/
}
-void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue)
+void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
{
+ QLS_LOG(info, "*** MessageStoreImpl::flush() queue=\"" << queue_.getName() << "\"");
+/*
if (queue.getExternalQueueStore() == 0) return;
checkInit();
std::string qn = queue.getName();
@@ -1192,17 +1268,20 @@ void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue)
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (jc) {
// TODO: check if this result should be used...
- /*mrg::journal::iores res =*/ jc->flush();
+ mrg::journal::iores res = jc->flush();
}
} catch (const qpid::qls_jrnl::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
}
+*/
}
-void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt,
- const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue)
+void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* /*ctxt*/,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg_,
+ const qpid::broker::PersistableQueue& /*queue*/)
{
+// QLS_LOG(info, "*** MessageStoreImpl::enqueue() queue=\"" << queue.getName() << "\"");
+/*
checkInit();
uint64_t queueId (queue.getPersistenceId());
uint64_t messageId (msg->getPersistenceId());
@@ -1228,29 +1307,34 @@ void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt,
// add queue* to the txn map..
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+*/
+ msg_->enqueueComplete();// DEBUG: only while null fns in use
}
-uint64_t MessageStoreImpl::msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message)
+uint64_t MessageStoreImpl::msgEncode(std::vector<char>& buff_,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message_)
{
- uint32_t headerSize = message->encodedHeaderSize();
- uint64_t size = message->encodedSize() + sizeof(uint32_t);
- try { buff = std::vector<char>(size); } // long + headers + content
+ uint32_t headerSize = message_->encodedHeaderSize();
+ uint64_t size = message_->encodedSize() + sizeof(uint32_t);
+ try { buff_ = std::vector<char>(size); } // long + headers + content
catch (const std::exception& e) {
std::ostringstream oss;
oss << "Unable to allocate memory for encoding message; requested size: " << size << "; error: " << e.what();
THROW_STORE_EXCEPTION(oss.str());
}
- qpid::framing::Buffer buffer(&buff[0],size);
+ qpid::framing::Buffer buffer(&buff_[0],size);
buffer.putLong(headerSize);
- message->encode(buffer);
+ message_->encode(buffer);
return size;
}
-void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue,
- TxnCtxt* txn,
- const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
- bool /*newId*/)
+void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_,
+ TxnCtxt* /*txn*/,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*message*/,
+ bool /*newId*/)
{
+ QLS_LOG(info, "*** MessageStoreImpl::store() queue=\"" << queue_->getName() << "\"");
+/*
std::vector<char> buff;
uint64_t size = msgEncode(buff, message);
@@ -1275,12 +1359,15 @@ void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue,
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": MessageStoreImpl::store() failed: " +
e.what());
}
+*/
}
-void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt,
- const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue)
+void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* /*ctxt*/,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg_,
+ const qpid::broker::PersistableQueue& /*queue*/)
{
+// QLS_LOG(info, "*** MessageStoreImpl::dequeue() queue=\"" << queue.getName() << "\"");
+/*
checkInit();
uint64_t queueId (queue.getPersistenceId());
uint64_t messageId (msg->getPersistenceId());
@@ -1302,14 +1389,16 @@ void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt,
// add queue* to the txn map..
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
async_dequeue(ctxt, msg, queue);
-
- msg->dequeueComplete();
+*/
+ msg_->dequeueComplete();
}
-void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt,
- const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue)
+void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* /*ctxt*/,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/,
+ const qpid::broker::PersistableQueue& queue_)
{
+ QLS_LOG(info, "*** MessageStoreImpl::async_dequeue() queue=\"" << queue_.getName() << "\"");
+/*
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->setSourceMessage(msg);
ddtokp->set_external_rid(true);
@@ -1334,38 +1423,39 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt,
ddtokp->release();
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
}
+*/
}
-uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
+uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue_*/)
{
- checkInit();
+/* checkInit();*/
return 0;
}
-void MessageStoreImpl::completed(TxnCtxt& txn,
- bool commit)
+void MessageStoreImpl::completed(TxnCtxt& txn_,
+ bool commit_)
{
try {
chkTplStoreInit(); // Late initialize (if needed)
// Nothing to do if not prepared
- if (txn.getDtok()->is_enqueued()) {
- txn.incrDtokRef();
- DataTokenImpl* dtokp = txn.getDtok();
+ if (txn_.getDtok()->is_enqueued()) {
+ txn_.incrDtokRef();
+ DataTokenImpl* dtokp = txn_.getDtok();
dtokp->set_dequeue_rid(dtokp->rid());
dtokp->set_rid(messageIdSequence.next());
- tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
+ tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), commit_);
}
- txn.complete(commit);
+ txn_.complete(commit_);
if (mgmtObject.get() != 0) {
mgmtObject->dec_tplTransactionDepth();
- if (commit)
+ if (commit_)
mgmtObject->inc_tplTxnCommits();
else
mgmtObject->inc_tplTxnAborts();
}
} catch (const std::exception& e) {
- QLS_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what());
+ QLS_LOG(error, "Error completing xid " << txn_.getXid() << ": " << e.what());
throw;
}
}
@@ -1377,54 +1467,54 @@ std::auto_ptr<qpid::broker::TransactionContext> MessageStoreImpl::begin()
return std::auto_ptr<qpid::broker::TransactionContext>(new TxnCtxt(&messageIdSequence));
}
-std::auto_ptr<qpid::broker::TPCTransactionContext> MessageStoreImpl::begin(const std::string& xid)
+std::auto_ptr<qpid::broker::TPCTransactionContext> MessageStoreImpl::begin(const std::string& xid_)
{
checkInit();
IdSequence* jtx = &messageIdSequence;
// pass sequence number for c/a
- return std::auto_ptr<qpid::broker::TPCTransactionContext>(new TPCTxnCtxt(xid, jtx));
+ return std::auto_ptr<qpid::broker::TPCTransactionContext>(new TPCTxnCtxt(xid_, jtx));
}
-void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt)
+void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt_)
{
checkInit();
- TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt);
+ TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt_);
if(!txn) throw qpid::broker::InvalidTransactionContextException();
localPrepare(txn);
}
-void MessageStoreImpl::localPrepare(TxnCtxt* ctxt)
+void MessageStoreImpl::localPrepare(TxnCtxt* ctxt_)
{
try {
chkTplStoreInit(); // Late initialize (if needed)
// This sync is required to ensure multi-queue atomicity - ie all txn data
// must hit the disk on *all* queues before the TPL prepare (enq) is written.
- ctxt->sync();
+ ctxt_->sync();
- ctxt->incrDtokRef();
- DataTokenImpl* dtokp = ctxt->getDtok();
+ ctxt_->incrDtokRef();
+ DataTokenImpl* dtokp = ctxt_->getDtok();
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
- char tpcFlag = static_cast<char>(ctxt->isTPC());
- tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt->getXid(), false);
- ctxt->prepare(tplStorePtr.get());
+ char tpcFlag = static_cast<char>(ctxt_->isTPC());
+ tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), false);
+ ctxt_->prepare(tplStorePtr.get());
// make sure all the data is written to disk before returning
- ctxt->sync();
+ ctxt_->sync();
if (mgmtObject.get() != 0) {
mgmtObject->inc_tplTransactionDepth();
mgmtObject->inc_tplTxnPrepares();
}
} catch (const std::exception& e) {
- QLS_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what());
+ QLS_LOG(error, "Error preparing xid " << ctxt_->getXid() << ": " << e.what());
throw;
}
}
-void MessageStoreImpl::commit(qpid::broker::TransactionContext& ctxt)
+void MessageStoreImpl::commit(qpid::broker::TransactionContext& ctxt_)
{
checkInit();
- TxnCtxt* txn(check(&ctxt));
+ TxnCtxt* txn(check(&ctxt_));
if (!txn->isTPC()) {
if (txn->impactedQueuesEmpty()) return;
localPrepare(dynamic_cast<TxnCtxt*>(txn));
@@ -1432,10 +1522,10 @@ void MessageStoreImpl::commit(qpid::broker::TransactionContext& ctxt)
completed(*dynamic_cast<TxnCtxt*>(txn), true);
}
-void MessageStoreImpl::abort(qpid::broker::TransactionContext& ctxt)
+void MessageStoreImpl::abort(qpid::broker::TransactionContext& ctxt_)
{
checkInit();
- TxnCtxt* txn(check(&ctxt));
+ TxnCtxt* txn(check(&ctxt_));
if (!txn->isTPC()) {
if (txn->impactedQueuesEmpty()) return;
localPrepare(dynamic_cast<TxnCtxt*>(txn));
@@ -1443,20 +1533,20 @@ void MessageStoreImpl::abort(qpid::broker::TransactionContext& ctxt)
completed(*dynamic_cast<TxnCtxt*>(txn), false);
}
-TxnCtxt* MessageStoreImpl::check(qpid::broker::TransactionContext* ctxt)
+TxnCtxt* MessageStoreImpl::check(qpid::broker::TransactionContext* ctxt_)
{
- TxnCtxt* txn = dynamic_cast<TxnCtxt*>(ctxt);
+ TxnCtxt* txn = dynamic_cast<TxnCtxt*>(ctxt_);
if(!txn) throw qpid::broker::InvalidTransactionContextException();
return txn;
}
-void MessageStoreImpl::put(db_ptr db,
- DbTxn* txn,
- Dbt& key,
- Dbt& value)
+void MessageStoreImpl::put(db_ptr db_,
+ DbTxn* txn_,
+ Dbt& key_,
+ Dbt& value_)
{
try {
- int status = db->put(txn, &key, &value, DB_NODUPDATA);
+ int status = db_->put(txn_, &key_, &value_, DB_NODUPDATA);
if (status == DB_KEYEXIST) {
THROW_STORE_EXCEPTION("duplicate data");
} else if (status) {
@@ -1467,7 +1557,7 @@ void MessageStoreImpl::put(db_ptr db,
}
}
-void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue)
+void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue_)
{
TxnCtxt txn;
txn.begin(dbenv.get(), true);
@@ -1484,9 +1574,9 @@ void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQue
THROW_STORE_EXCEPTION("Not enough data for binding");
}
uint64_t queueId = buffer.getLongLong();
- if (queue.getPersistenceId() == queueId) {
+ if (queue_.getPersistenceId() == queueId) {
bindings->del(0);
- QLS_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
+ QLS_LOG(debug, "Deleting binding for " << queue_.getName() << " " << key.id << "->" << queueId);
}
}
}
@@ -1498,12 +1588,12 @@ void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQue
txn.abort();
throw;
}
- QLS_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
+ QLS_LOG(debug, "Deleted all bindings for " << queue_.getName() << ":" << queue_.getPersistenceId());
}
-void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& bkey)
+void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& exchange_,
+ const qpid::broker::PersistableQueue& queue_,
+ const std::string& bkey_)
{
TxnCtxt txn;
txn.begin(dbenv.get(), true);
@@ -1512,7 +1602,7 @@ void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& ex
Cursor bindings;
bindings.open(bindingDb, txn.get());
- IdDbt key(exchange.getPersistenceId());
+ IdDbt key(exchange_.getPersistenceId());
Dbt value;
for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
@@ -1521,14 +1611,14 @@ void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& ex
THROW_STORE_EXCEPTION("Not enough data for binding");
}
uint64_t queueId = buffer.getLongLong();
- if (queue.getPersistenceId() == queueId) {
+ if (queue_.getPersistenceId() == queueId) {
std::string q;
std::string k;
buffer.getShortString(q);
buffer.getShortString(k);
- if (bkey == k) {
+ if (bkey_ == k) {
bindings->del(0);
- QLS_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
+ QLS_LOG(debug, "Deleting binding for " << queue_.getName() << " " << key.id << "->" << queueId);
}
}
}
@@ -1543,6 +1633,13 @@ void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& ex
}
}
+std::string MessageStoreImpl::getStoreTopLevelDir() {
+ std::ostringstream dir;
+ dir << storeDir << "/" << storeTopLevelDir;
+ return dir.str();
+}
+
+
std::string MessageStoreImpl::getJrnlBaseDir()
{
std::ostringstream dir;
@@ -1564,43 +1661,28 @@ std::string MessageStoreImpl::getTplBaseDir()
return dir.str();
}
-std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
+std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& queue_) //for exmaple /var/rhm/ + queueDir/
{
- return getJrnlHashDir(queue.getName().c_str());
-}
-
-uint32_t MessageStoreImpl::bHash(const std::string str)
-{
- // Daniel Bernstein hash fn
- uint32_t h = 0;
- for (std::string::const_iterator i = str.begin(); i < str.end(); i++)
- h = 33*h + *i;
- return h;
-}
-
-std::string MessageStoreImpl::getJrnlHashDir(const std::string& queueName) //for exmaple /var/rhm/ + queueDir/
-{
- std::stringstream dir;
- dir << getJrnlBaseDir() << std::hex << std::setfill('0') << std::setw(4);
- dir << (bHash(queueName.c_str()) % 29); // Use a prime number for better distribution across dirs
- dir << "/" << queueName << "/";
- return dir.str();
+ /*return getJrnlHashDir(queue_.getName().c_str());*/
+ std::ostringstream oss;
+ oss << getJrnlBaseDir() << queue_.getName();
+ return oss.str();
}
std::string MessageStoreImpl::getStoreDir() const { return storeDir; }
-void MessageStoreImpl::journalDeleted(JournalImpl& j) {
+void MessageStoreImpl::journalDeleted(JournalImpl& j_) {
qpid::sys::Mutex::ScopedLock sl(journalListLock);
- journalList.erase(j.id());
+ journalList.erase(j_.id());
}
-MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
- qpid::Options(name),
+MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
+ qpid::Options(name_),
truncateFlag(defTruncateFlag),
wCachePageSizeKib(defWCachePageSize),
tplWCachePageSizeKib(defTplWCachePageSize),
efpPartition(defEfpPartition),
- efpFileSize(defEfpFileSize)
+ efpFileSizeKib(defEfpFileSizeKib)
{
addOptions()
("store-dir", qpid::optValue(storeDir, "DIR"),
@@ -1619,8 +1701,8 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
"Lower values decrease latency at the expense of throughput.")
("efp-partition", qpid::optValue(efpPartition, "N"),
"Empty File Pool partition to use for finding empty journal files")
- ("efp-file-size", qpid::optValue(efpFileSize, "N"),
- "Empty File Pool file size to use for journal files")
+ ("efp-file-size", qpid::optValue(efpFileSizeKib, "N"),
+ "Empty File Pool file size in KiB to use for journal files. Must be a multiple of 4 KiB.")
;
}
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
index 5d5fa28ff8..8d88e9da97 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
@@ -26,10 +26,12 @@
#include "db-inc.h"
#include "qpid/linearstore/Cursor.h"
+#include "qpid/linearstore/EmptyFilePoolManagerImpl.h"
#include "qpid/linearstore/IdDbt.h"
#include "qpid/linearstore/IdSequence.h"
#include "qpid/linearstore/JournalImpl.h"
#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
#include "qpid/linearstore/PreparedTransaction.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/MessageStore.h"
@@ -48,6 +50,9 @@ class Timer;
}}
namespace qpid{
+namespace qls_jrnl {
+class EmptyFilePoolManager;
+}
namespace linearstore{
/**
@@ -67,7 +72,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
uint32_t wCachePageSizeKib;
uint32_t tplWCachePageSizeKib;
uint16_t efpPartition;
- uint64_t efpFileSize;
+ uint64_t efpFileSizeKib;
};
protected:
@@ -98,10 +103,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
static const bool defTruncateFlag = false;
static const uint32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024;
static const uint32_t defTplWCachePageSize = defWCachePageSize / 8;
- static const uint16_t defEfpPartition = 0;
- static const uint64_t defEfpFileSize = 512 * JRNL_SBLK_SIZE;
-
+ static const uint16_t defEfpPartition = 1;
+ static const uint64_t defEfpFileSizeKib = 512 * JRNL_SBLK_SIZE / 1024;
static const std::string storeTopLevelDir;
+
static qpid::sys::Duration defJournalGetEventsTimeout;
static qpid::sys::Duration defJournalFlushTimeout;
@@ -127,21 +132,18 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
IdSequence generalIdSequence;
IdSequence messageIdSequence;
std::string storeDir;
- uint16_t numJrnlFiles;
- bool autoJrnlExpand;
- uint16_t autoJrnlExpandMaxFiles;
- uint32_t jrnlFsizeSblks;
+ qpid::qls_jrnl::efpPartitionNumber_t defaultEfpPartitionNumber;
+ qpid::qls_jrnl::efpFileSizeKib_t defaultEfpFileSizeKib;
bool truncateFlag;
uint32_t wCachePgSizeSblks;
uint16_t wCacheNumPages;
- uint16_t tplNumJrnlFiles;
- uint32_t tplJrnlFsizeSblks;
uint32_t tplWCachePgSizeSblks;
uint16_t tplWCacheNumPages;
uint64_t highestRid;
bool isInit;
const char* envPath;
qpid::broker::Broker* broker;
+ boost::shared_ptr<EmptyFilePoolManagerImpl> efpMgr;
qmf::org::apache::qpid::linearstore::Store::shared_ptr mgmtObject;
qpid::management::ManagementAgent* agent;
@@ -149,9 +151,13 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
// Parameter validation and calculation
static uint32_t chkJrnlWrPageCacheSize(const uint32_t param,
- const std::string paramName/*,
- const uint16_t jrnlFsizePgs*/);
- static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKib);
+ const std::string& paramName/*,
+ const uint16_t jrnlFsizePgs*/);
+ static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKiB);
+ static qpid::qls_jrnl::efpPartitionNumber_t chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition,
+ const std::string& paramName);
+ static qpid::qls_jrnl::efpFileSizeKib_t chkEfpFileSizeKiB(const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKiB,
+ const std::string& paramName);
void init();
@@ -225,9 +231,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
// journal functions
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
- uint32_t bHash(const std::string str);
std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
- std::string getJrnlHashDir(const std::string& queueName);
+ qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t p, const qpid::qls_jrnl::efpFileSizeKib_t s);
+ qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::framing::FieldTable& args);
+ std::string getStoreTopLevelDir();
std::string getJrnlBaseDir();
std::string getBdbBaseDir();
std::string getTplBaseDir();
@@ -260,11 +267,13 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
bool init(const qpid::Options* options);
bool init(const std::string& dir,
+ qpid::qls_jrnl::efpPartitionNumber_t efpPartition = defEfpPartition,
+ qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib = defEfpFileSizeKib,
const bool truncateFlag = false,
uint32_t wCachePageSize = defWCachePageSize,
uint32_t tplWCachePageSize = defTplWCachePageSize);
- void truncateInit(const bool saveStoreContent = false);
+ void truncateInit();
void initManagement ();
diff --git a/qpid/cpp/src/qpid/linearstore/Log.h b/qpid/cpp/src/qpid/linearstore/QpidLog.h
index b03ea7ac9d..b03ea7ac9d 100644
--- a/qpid/cpp/src/qpid/linearstore/Log.h
+++ b/qpid/cpp/src/qpid/linearstore/QpidLog.h
diff --git a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
index 840468d8bd..bf574d9740 100644
--- a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
+++ b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
@@ -23,7 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/DataDir.h"
-#include "qpid/linearstore/Log.h"
+#include "qpid/linearstore/QpidLog.h"
#include "qpid/linearstore/MessageStoreImpl.h"
using qpid::linearstore::MessageStoreImpl;
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
new file mode 100644
index 0000000000..90e04df7ed
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
@@ -0,0 +1,255 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "EmptyFilePool.h"
+
+#include <cctype>
+#include <fstream>
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/jdir.h"
+#include "qpid/linearstore/jrnl/JournalFile.h"
+#include "qpid/linearstore/jrnl/slock.h"
+#include "qpid/linearstore/jrnl/utils/file_hdr.h"
+#include <sys/stat.h>
+#include <uuid/uuid.h>
+#include <vector>
+
+#include <iostream> // DEBUG
+
+namespace qpid {
+namespace qls_jrnl {
+
+EmptyFilePool::EmptyFilePool(const std::string& efpDirectory_,
+ const EmptyFilePoolPartition* partitionPtr_) :
+ efpDirectory(efpDirectory_),
+ efpFileSizeKib(fileSizeKbFromDirName(efpDirectory_, partitionPtr_->partitionNumber())),
+ partitionPtr(partitionPtr_)
+{}
+
+EmptyFilePool::~EmptyFilePool() {}
+
+void
+EmptyFilePool::initialize() {
+ //std::cout << "Reading " << efpDirectory << std::endl; // DEBUG
+ std::vector<std::string> dirList;
+ jdir::read_dir(efpDirectory, dirList, false, true, false);
+ for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
+ size_t dotPos = i->rfind(".");
+ if (dotPos != std::string::npos) {
+ if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) {
+ std::string emptyFile(efpDirectory + "/" + (*i));
+ if (validateEmptyFile(emptyFile)) {
+ pushEmptyFile(emptyFile);
+ }
+ }
+ }
+ }
+ //std::cout << "Found " << emptyFileList.size() << " files" << std::endl; // DEBUG
+}
+
+efpFileSizeKib_t
+EmptyFilePool::fileSizeKib() const {
+ return efpFileSizeKib;
+}
+
+efpFileCount_t
+EmptyFilePool::numEmptyFiles() const {
+ slock l(emptyFileListMutex);
+ return efpFileCount_t(emptyFileList.size());
+}
+
+efpFileSizeKib_t
+EmptyFilePool::cumFileSizeKib() const {
+ slock l(emptyFileListMutex);
+ return efpFileSizeKib_t(emptyFileList.size()) * efpFileSizeKib;
+}
+
+efpPartitionNumber_t
+EmptyFilePool::getPartitionNumber() const {
+ return partitionPtr->partitionNumber();
+}
+
+const EmptyFilePoolPartition*
+EmptyFilePool::getPartition() const {
+ return partitionPtr;
+}
+
+const efpIdentity_t
+EmptyFilePool::getIdentity() const {
+ return efpIdentity_t(partitionPtr->partitionNumber(), efpFileSizeKib);
+}
+
+std::string
+EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
+ std::string emptyFileName = popEmptyFile();
+ std::string newFileName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
+ if (::rename(emptyFileName.c_str(), newFileName.c_str())) {
+ pushEmptyFile(emptyFileName);
+ std::ostringstream oss;
+ oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile");
+ }
+ return newFileName;
+}
+
+bool
+EmptyFilePool::returnEmptyFile(const JournalFile* srcFile) {
+ std::string emptyFileName(efpDirectory + srcFile->fileName());
+ // TODO: reset file here
+ if (::rename(srcFile->fqFileName().c_str(), emptyFileName.c_str())) {
+ std::ostringstream oss;
+ oss << "file=\"" << srcFile << "\" dest=\"" << emptyFileName << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile");
+ }
+ pushEmptyFile(emptyFileName);
+ return true;
+}
+
+// protected
+
+void
+EmptyFilePool::pushEmptyFile(const std::string fqFileName_) {
+ slock l(emptyFileListMutex);
+ emptyFileList.push_back(fqFileName_);
+}
+
+std::string
+EmptyFilePool::popEmptyFile() {
+ std::string emptyFileName;
+ bool isEmpty = false;
+ {
+ slock l(emptyFileListMutex);
+ isEmpty = emptyFileList.empty();
+ }
+ if (isEmpty) {
+ createEmptyFile();
+ }
+ {
+ slock l(emptyFileListMutex);
+ emptyFileName = emptyFileList.front();
+ emptyFileList.pop_front();
+ }
+ return emptyFileName;
+}
+
+void
+EmptyFilePool::createEmptyFile() {
+ file_hdr_t fh;
+ ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDRSIZESBLKS, partitionPtr->partitionNumber(),
+ efpFileSizeKib);
+ std::string efpfn = getEfpFileName();
+ std::ofstream ofs(efpfn.c_str(), std::ofstream::out | std::ofstream::binary);
+ if (ofs.good()) {
+ ofs.write((char*)&fh, sizeof(file_hdr_t));
+ uint64_t rem = ((efpFileSizeKib + (QLS_JRNL_FHDRSIZESBLKS * JRNL_SBLK_SIZE_KIB)) * 1024) - sizeof(file_hdr_t);
+ while (rem--)
+ ofs.put('\0');
+ ofs.close();
+ pushEmptyFile(efpfn);
+ std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " <<
+ efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl;
+ } else {
+ std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG
+ }
+}
+
+bool
+EmptyFilePool::validateEmptyFile(const std::string& emptyFileName_) const {
+ struct stat s;
+ if (::stat(emptyFileName_.c_str(), &s))
+ {
+ std::ostringstream oss;
+ oss << "stat: file=\"" << emptyFileName_ << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "EmptyFilePool", "validateEmptyFile");
+ }
+ efpFileSizeKib_t expectedSize = (JRNL_SBLK_SIZE_KIB + efpFileSizeKib) * 1024;
+ if ((efpFileSizeKib_t)s.st_size != expectedSize) {
+ //std::cout << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size << std::endl; // DEBUG
+ return false;
+ }
+
+ std::ifstream ifs(emptyFileName_.c_str(), std::ifstream::in | std::ifstream::binary);
+ if (!ifs) {
+ //std::cout << "ERROR: File " << emptyFileName << ": Unable to open for reading" << std::endl;
+ return false;
+ }
+
+ const uint8_t fhFileNameBuffLen = 50;
+ char fhFileNameBuff[fhFileNameBuffLen];
+ file_hdr_t fh;
+ ifs.read((char*)&fh, sizeof(file_hdr_t));
+ uint16_t fhFileNameLen = fh._queue_name_len > fhFileNameBuffLen ? fhFileNameBuffLen : fh._queue_name_len;
+ ifs.read(fhFileNameBuff, fhFileNameLen);
+ std::string fhFileName(fhFileNameBuff, fhFileNameLen);
+ ifs.close();
+
+ if (fh._rhdr._magic != QLS_FILE_MAGIC ||
+ fh._rhdr._version != QLS_JRNL_VERSION ||
+ fh._efp_partition != partitionPtr->partitionNumber() ||
+ fh._file_size_kib != efpFileSizeKib ||
+ !::is_file_hdr_reset(&fh))
+ {
+ //std::cout << "ERROR: File " << emptyFileName << ": Invalid file header" << std::endl;
+ return false;
+ }
+
+ return true;
+}
+
+std::string
+EmptyFilePool::getEfpFileName() {
+ uuid_t uuid;
+ ::uuid_generate(uuid); // NOTE: NOT THREAD SAFE
+ char uuid_str[37]; // 36 char uuid + trailing \0
+ ::uuid_unparse(uuid, uuid_str);
+ std::ostringstream oss;
+ oss << efpDirectory << "/" << uuid_str << QLS_JRNL_FILE_EXTENSION;
+ return oss.str();
+}
+
+// protected
+// static
+efpFileSizeKib_t
+EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName_,
+ const efpPartitionNumber_t partitionNumber_) {
+ // Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0.
+ std::string n(dirName_.substr(dirName_.rfind('/')+1));
+ bool valid = true;
+ for (uint16_t charNum = 0; charNum < n.length(); ++charNum) {
+ if (charNum < n.length()-1) {
+ if (!::isdigit((int)n[charNum])) {
+ valid = false;
+ break;
+ }
+ } else {
+ valid = n[charNum] == 'k';
+ }
+ }
+ efpFileSizeKib_t s = ::atol(n.c_str());
+ if (!valid || s == 0 || s % JRNL_SBLK_SIZE_KIB != 0) {
+ std::ostringstream oss;
+ oss << "Partition: " << partitionNumber_ << "; EFP directory: \'" << n << "\'";
+ throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName");
+ }
+ return s;
+}
+
+}} // namespace qpid::qls_jrnl
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
new file mode 100644
index 0000000000..1958bb7647
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_QLS_JRNL_EMPTYFILEPOOL_H_
+#define QPID_QLS_JRNL_EMPTYFILEPOOL_H_
+
+namespace qpid {
+namespace qls_jrnl {
+
+ class EmptyFilePool;
+
+}} // namespace qpid::qls_jrnl
+
+#include <deque>
+#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
+#include "qpid/linearstore/jrnl/smutex.h"
+#include <string>
+
+namespace qpid {
+namespace qls_jrnl {
+class jdir;
+class JournalFile;
+
+class EmptyFilePool
+{
+protected:
+ typedef std::deque<std::string> emptyFileList_t;
+ typedef emptyFileList_t::iterator emptyFileListItr_t;
+
+ const std::string efpDirectory;
+ const efpFileSizeKib_t efpFileSizeKib;
+ const EmptyFilePoolPartition* partitionPtr;
+
+private:
+ emptyFileList_t emptyFileList;
+ smutex emptyFileListMutex;
+
+public:
+ EmptyFilePool(const std::string& efpDirectory_,
+ const EmptyFilePoolPartition* partitionPtr_);
+ virtual ~EmptyFilePool();
+
+ void initialize();
+ efpFileSizeKib_t fileSizeKib() const;
+ efpFileCount_t numEmptyFiles() const;
+ efpFileSizeKib_t cumFileSizeKib() const;
+ efpPartitionNumber_t getPartitionNumber() const;
+ const EmptyFilePoolPartition* getPartition() const;
+ const efpIdentity_t getIdentity() const;
+
+ std::string takeEmptyFile(const std::string& destDirectory_);
+ bool returnEmptyFile(const JournalFile* srcFile_);
+
+protected:
+ void pushEmptyFile(const std::string fqFileName_);
+ std::string popEmptyFile();
+ void createEmptyFile();
+ bool validateEmptyFile(const std::string& emptyFileName_) const;
+ std::string getEfpFileName();
+ static efpFileSizeKib_t fileSizeKbFromDirName(const std::string& dirName_,
+ const efpPartitionNumber_t partitionNumber_);
+};
+
+}} // namespace qpid::qls_jrnl
+
+#endif /* QPID_QLS_JRNL_EMPTYFILEPOOL_H_ */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
new file mode 100644
index 0000000000..1bc716f912
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
@@ -0,0 +1,175 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "EmptyFilePoolManager.h"
+
+#include <dirent.h>
+#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
+#include "qpid/linearstore/jrnl/jdir.h"
+#include "qpid/linearstore/jrnl/slock.h"
+#include <vector>
+
+// DEBUG
+//#include <iostream>
+
+namespace qpid {
+namespace qls_jrnl {
+
+EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath_) :
+ qlsStorePath(qlsStorePath_)
+{}
+
+EmptyFilePoolManager::~EmptyFilePoolManager() {
+ slock l(partitionMapMutex);
+ for (partitionMapItr_t i = partitionMap.begin(); i != partitionMap.end(); ++i) {
+ delete i->second;
+ }
+ partitionMap.clear();
+}
+
+void
+EmptyFilePoolManager::findEfpPartitions() {
+ //std::cout << "*** Reading " << qlsStorePath << std::endl; // DEBUG
+ std::vector<std::string> dirList;
+ jdir::read_dir(qlsStorePath, dirList, true, false, true);
+ for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
+ if ((*i)[0] == 'p' && i->length() == 4) { // Filter: look only at names pNNN
+ efpPartitionNumber_t pn = ::atoi(i->c_str() + 1);
+ std::string fullDirPath(qlsStorePath + "/" + (*i));
+ EmptyFilePoolPartition* efppp = 0;
+ try {
+ efppp = new EmptyFilePoolPartition(pn, fullDirPath);
+ {
+ slock l(partitionMapMutex);
+ partitionMap[pn] = efppp;
+ }
+ } catch (const std::exception& e) {
+ if (efppp != 0) {
+ delete efppp;
+ efppp = 0;
+ }
+ //std::cerr << "Unable to initialize partition " << pn << " (\'" << fullDirPath << "\'): " << e.what() << std::endl;
+ }
+ if (efppp != 0)
+ efppp->findEmptyFilePools();
+ }
+ }
+}
+
+uint16_t
+EmptyFilePoolManager::getNumEfpPartitions() const {
+ return partitionMap.size();
+}
+
+EmptyFilePoolPartition*
+EmptyFilePoolManager::getEfpPartition(const efpPartitionNumber_t partitionNumber) {
+ partitionMapItr_t i = partitionMap.find(partitionNumber);
+ if (i == partitionMap.end())
+ return 0;
+ else
+ return i->second;
+}
+
+void
+EmptyFilePoolManager::getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList,
+ const efpFileSizeKib_t efpFileSizeKb) const {
+ slock l(partitionMapMutex);
+ for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
+ if (efpFileSizeKb == 0) {
+ partitionNumberList.push_back(i->first);
+ } else {
+ std::vector<efpFileSizeKib_t> efpFileSizeList;
+ i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
+ for (std::vector<efpFileSizeKib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
+ if (*j == efpFileSizeKb) {
+ partitionNumberList.push_back(i->first);
+ break;
+ }
+ }
+ }
+ }
+}
+
+void
+EmptyFilePoolManager::getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList,
+ const efpFileSizeKib_t efpFileSizeKb) {
+ slock l(partitionMapMutex);
+ for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
+ if (efpFileSizeKb == 0) {
+ partitionList.push_back(i->second);
+ } else {
+ std::vector<efpFileSizeKib_t> efpFileSizeList;
+ i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
+ for (std::vector<efpFileSizeKib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
+ if (*j == efpFileSizeKb) {
+ partitionList.push_back(i->second);
+ break;
+ }
+ }
+ }
+ }
+}
+
+void
+EmptyFilePoolManager::getEfpFileSizes(std::vector<efpFileSizeKib_t>& efpFileSizeList,
+ const efpPartitionNumber_t efpPartitionNumber) const {
+ if (efpPartitionNumber == 0) {
+ for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
+ i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
+ }
+ } else {
+ partitionMapConstItr_t i = partitionMap.find(efpPartitionNumber);
+ if (i != partitionMap.end()) {
+ i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
+ }
+ }
+}
+
+void
+EmptyFilePoolManager::getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList,
+ const efpPartitionNumber_t efpPartitionNumber) {
+ if (efpPartitionNumber == 0) {
+ for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
+ i->second->getEmptyFilePools(emptyFilePoolList);
+ }
+ } else {
+ partitionMapConstItr_t i = partitionMap.find(efpPartitionNumber);
+ if (i != partitionMap.end()) {
+ i->second->getEmptyFilePools(emptyFilePoolList);
+ }
+ }
+}
+
+EmptyFilePool*
+EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber,
+ const efpFileSizeKib_t efpFileSizeKib) {
+ EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber);
+ if (efppp != 0)
+ return efppp->getEmptyFilePool(efpFileSizeKib);
+ return 0;
+}
+
+EmptyFilePool*
+EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIdentity) {
+ return getEmptyFilePool(efpIdentity.first, efpIdentity.second);
+}
+
+}} // namespace qpid::qls_jrnl
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
new file mode 100644
index 0000000000..e34eb2c0f3
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLMANAGER_H_
+#define QPID_QLS_JRNL_EMPTYFILEPOOLMANAGER_H_
+
+#include <map>
+#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
+#include "qpid/linearstore/jrnl/smutex.h"
+#include <string>
+
+namespace qpid {
+namespace qls_jrnl {
+
+class EmptyFilePoolManager
+{
+protected:
+ typedef std::map<efpPartitionNumber_t, EmptyFilePoolPartition*> partitionMap_t;
+ typedef partitionMap_t::iterator partitionMapItr_t;
+ typedef partitionMap_t::const_iterator partitionMapConstItr_t;
+
+ std::string qlsStorePath;
+ partitionMap_t partitionMap;
+ smutex partitionMapMutex;
+
+public:
+ EmptyFilePoolManager(const std::string& qlsStorePath_);
+ virtual ~EmptyFilePoolManager();
+ void findEfpPartitions();
+
+ uint16_t getNumEfpPartitions() const;
+ EmptyFilePoolPartition* getEfpPartition(const efpPartitionNumber_t partitionNumber);
+ void getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList, const efpFileSizeKib_t efpFileSizeKb = 0) const;
+ void getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList, const efpFileSizeKib_t efpFileSizeKb = 0);
+
+ void getEfpFileSizes(std::vector<efpFileSizeKib_t>& efpFileSizeList, const efpPartitionNumber_t efpPartitionNumber = 0) const;
+ void getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList, const efpPartitionNumber_t efpPartitionNumber = 0);
+
+ EmptyFilePool* getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpFileSizeKib_t efpFileSizeKb);
+ EmptyFilePool* getEmptyFilePool(const efpIdentity_t efpIdentity);
+};
+
+}} // namespace qpid::qls_jrnl
+
+#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLMANAGER_H_ */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
new file mode 100644
index 0000000000..6e6bbc4abd
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
@@ -0,0 +1,134 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
+
+#include <dirent.h>
+#include "qpid/linearstore/jrnl/jdir.h"
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/slock.h"
+
+//#include <iostream> // DEBUG
+
+namespace qpid {
+namespace qls_jrnl {
+
+const std::string EmptyFilePoolPartition::efpTopLevelDir("efp"); // Sets the top-level efp dir within a partition
+
+EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum_, const std::string& partitionDir_) :
+ partitionNum(partitionNum_),
+ partitionDir(partitionDir_)
+{
+ validatePartitionDir();
+}
+
+EmptyFilePoolPartition::~EmptyFilePoolPartition() {
+ slock l(efpMapMutex);
+ for (efpMapItr_t i = efpMap.begin(); i != efpMap.end(); ++i) {
+ delete i->second;
+ }
+ efpMap.clear();
+}
+
+void
+EmptyFilePoolPartition::validatePartitionDir() {
+ if (!jdir::is_dir(partitionDir)) {
+ std::ostringstream ss;
+ ss << "Invalid partition directory: \'" << partitionDir << "\' is not a directory";
+ throw jexception(jerrno::JERR_EFP_BADPARTITIONDIR, ss.str(), "EmptyFilePoolPartition", "validatePartitionDir");
+ }
+ // TODO: other validity checks here
+}
+
+void
+EmptyFilePoolPartition::findEmptyFilePools() {
+ //std::cout << "Reading " << partitionDir << std::endl; // DEBUG
+ std::vector<std::string> dirList;
+ jdir::read_dir(partitionDir, dirList, true, false, false);
+ bool foundEfpDir = false;
+ for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
+ if (i->compare(efpTopLevelDir) == 0) {
+ foundEfpDir = true;
+ break;
+ }
+ }
+ if (foundEfpDir) {
+ std::string efpDir(partitionDir + "/" + efpTopLevelDir);
+ //std::cout << "Reading " << efpDir << std::endl; // DEBUG
+ dirList.clear();
+ jdir::read_dir(efpDir, dirList, true, false, false);
+ for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
+ std::string efpSizeDir(efpDir + "/" + (*i));
+ EmptyFilePool* efpp = 0;
+ try {
+ efpp = new EmptyFilePool(efpSizeDir, this);
+ {
+ slock l(efpMapMutex);
+ efpMap[efpp->fileSizeKib()] = efpp;
+ }
+ }
+ catch (const std::exception& e) {
+ if (efpp != 0) {
+ delete efpp;
+ efpp = 0;
+ }
+ //std::cerr << "WARNING: " << e.what() << std::endl;
+ }
+ if (efpp != 0)
+ efpp->initialize();
+ }
+ }
+}
+
+efpPartitionNumber_t
+EmptyFilePoolPartition::partitionNumber() const {
+ return partitionNum;
+}
+
+std::string
+EmptyFilePoolPartition::partitionDirectory() const {
+ return partitionDir;
+}
+
+EmptyFilePool*
+EmptyFilePoolPartition::getEmptyFilePool(const efpFileSizeKib_t efpFileSizeKb) {
+ efpMapItr_t i = efpMap.find(efpFileSizeKb);
+ if (i == efpMap.end())
+ return 0;
+ return i->second;
+}
+
+void
+EmptyFilePoolPartition::getEmptyFilePoolSizesKb(std::vector<efpFileSizeKib_t>& efpFileSizesKbList) const {
+ for (efpMapConstItr_t i=efpMap.begin(); i!=efpMap.end(); ++i) {
+ efpFileSizesKbList.push_back(i->first);
+ }
+}
+
+void
+EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) {
+ for (efpMapItr_t i=efpMap.begin(); i!=efpMap.end(); ++i) {
+ efpList.push_back(i->second);
+ }
+}
+
+}} // namespace qpid::qls_jrnl
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
new file mode 100644
index 0000000000..2013884b38
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_
+#define QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_
+
+namespace qpid {
+namespace qls_jrnl {
+
+ class EmptyFilePoolPartition;
+
+}} // namespace qpid::qls_jrnl
+
+#include "qpid/linearstore/jrnl/EmptyFilePool.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
+#include "qpid/linearstore/jrnl/smutex.h"
+#include <string>
+#include <map>
+#include <vector>
+
+namespace qpid {
+namespace qls_jrnl {
+
+class EmptyFilePoolPartition
+{
+public:
+ static const std::string efpTopLevelDir;
+protected:
+ typedef std::map<efpFileSizeKib_t, EmptyFilePool*> efpMap_t;
+ typedef efpMap_t::iterator efpMapItr_t;
+ typedef efpMap_t::const_iterator efpMapConstItr_t;
+
+ const efpPartitionNumber_t partitionNum;
+ const std::string partitionDir;
+ efpMap_t efpMap;
+ smutex efpMapMutex;
+
+ void validatePartitionDir();
+
+public:
+ EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum_, const std::string& partitionDir_);
+ virtual ~EmptyFilePoolPartition();
+ void findEmptyFilePools();
+ efpPartitionNumber_t partitionNumber() const;
+ std::string partitionDirectory() const;
+
+ EmptyFilePool* getEmptyFilePool(const efpFileSizeKib_t efpFileSizeKb);
+ void getEmptyFilePoolSizesKb(std::vector<efpFileSizeKib_t>& efpFileSizesKbList) const;
+ void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList);
+};
+
+}} // namespace qpid::qls_jrnl
+
+#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_ */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
new file mode 100644
index 0000000000..de91bdc06a
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
@@ -0,0 +1,37 @@
+ /*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_
+#define QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_
+
+#include <stdint.h>
+
+namespace qpid {
+namespace qls_jrnl {
+
+ typedef uint64_t efpFileSizeKib_t;
+ typedef uint32_t efpFileCount_t;
+ typedef uint16_t efpPartitionNumber_t;
+ typedef std::pair<efpPartitionNumber_t, efpFileSizeKib_t> efpIdentity_t;
+
+}} // namespace qpid::qls_jrnl
+
+#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
new file mode 100644
index 0000000000..87ecdfb7a8
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/JournalFile.h"
+
+namespace qpid {
+namespace qls_jrnl {
+
+JournalFile::JournalFile(const std::string& fqFileName_) :
+ fqfn(fqFileName_)
+{}
+
+JournalFile::~JournalFile() {}
+
+const std::string
+JournalFile::directory() const {
+ return fqfn.substr(0, fqfn.rfind('/'));
+}
+
+const std::string
+JournalFile::fileName() const {
+ return fqfn.substr(fqfn.rfind('/'));
+}
+
+const std::string
+JournalFile::fqFileName() const {
+ return fqfn;
+}
+
+bool
+JournalFile::empty() const {
+ // TODO: return true if no still-enqueued records (or parts of records) exist in this file
+ return true;
+}
+
+}} // namespace qpid::qls_jrnl
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
new file mode 100644
index 0000000000..de587d94e3
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_JOURNALFILE_H_
+#define QPID_LINEARSTORE_JOURNALFILE_H_
+
+#include <string>
+
+namespace qpid {
+namespace qls_jrnl {
+
+class JournalFile
+{
+protected:
+ const std::string fqfn;
+public:
+ JournalFile(const std::string& fqFileName_);
+ virtual ~JournalFile();
+
+ const std::string directory() const;
+ const std::string fileName() const;
+ const std::string fqFileName() const;
+ bool empty() const;
+};
+
+}} // namespace qpid::qls_jrnl
+
+#endif // QPID_LINEARSTORE_JOURNALFILE_H_
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp
new file mode 100644
index 0000000000..6167a641c9
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/JournalFileController.h"
+
+#include <fstream>
+#include "qpid/linearstore/jrnl/EmptyFilePool.h"
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/JournalFile.h"
+#include "qpid/linearstore/jrnl/slock.h"
+#include "qpid/linearstore/jrnl/utils/file_hdr.h"
+
+#include <iostream> // DEBUG
+
+namespace qpid {
+namespace qls_jrnl {
+
+JournalFileController::JournalFileController(const std::string& dir_,
+ EmptyFilePool* efpp_) :
+ dir(dir_),
+ efpp(efpp_),
+ fileSeqCounter(0)
+{
+ //std::cout << "*** JournalFileController::JournalFileController() dir=" << dir << std::endl;
+}
+
+JournalFileController::~JournalFileController() {}
+
+void
+JournalFileController::pullEmptyFileFromEfp(const uint64_t recId_,
+ const uint64_t firstRecOffs_,
+ const std::string& queueName_) {
+ std::string ef = efpp->takeEmptyFile(dir);
+ //std::cout << "*** JournalFileController::pullEmptyFileFromEfp() qn=" << queueName_ << " ef=" << ef << std::endl;
+ const JournalFile* jfp = new JournalFile(ef/*efpp->takeEmptyFile(dir)*/);
+ initialzeFileHeader(jfp->fqFileName(), recId_, firstRecOffs_, getNextFileSeqNum(), queueName_);
+ {
+ slock l(journalFileListMutex);
+ journalFileList.push_back(jfp);
+ }
+}
+
+void
+JournalFileController::purgeFilesToEfp() {
+ slock l(journalFileListMutex);
+ while (journalFileList.front()->empty()) {
+
+ efpp->returnEmptyFile(journalFileList.front());
+ delete journalFileList.front();
+ journalFileList.pop_front();
+ }
+}
+
+void
+JournalFileController::finalize() {}
+
+void
+JournalFileController::setFileSeqNum(const uint64_t fileSeqNum) {
+ fileSeqCounter = fileSeqNum;
+}
+
+// protected
+
+std::string
+JournalFileController::readFileHeader(file_hdr_t* fhdr_,
+ const std::string& fileName_) {
+ //std::cout << "*** JournalFileController::readFileHeader() fn=" << fileName_ << std::endl;
+ char buff[JRNL_SBLK_SIZE];
+ std::ifstream ifs(fileName_.c_str(), std::ifstream::in | std::ifstream::binary);
+ if (ifs.good()) {
+ ifs.read(buff, JRNL_SBLK_SIZE);
+ ifs.close();
+ std::memcpy(fhdr_, buff, sizeof(file_hdr_t));
+ return std::string(buff + sizeof(file_hdr_t), fhdr_->_queue_name_len);
+ } else {
+ std::cerr << "ERROR: Could not open file \"" << fileName_ << "\" for reading." << std::endl;
+ }
+ return std::string("");
+}
+
+void
+JournalFileController::writeFileHeader(const file_hdr_t* fhdr_,
+ const std::string& queueName_,
+ const std::string& fileName_) {
+ //std::cout << "*** JournalFileController::writeFileHeader() qn=" << queueName_ << " fn=" << fileName_ << std::endl;
+ std::fstream fs(fileName_.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);
+ if (fs.good()) {
+ fs.seekp(0);
+ fs.write((const char*)fhdr_, sizeof(file_hdr_t));
+ fs.write(queueName_.data(), fhdr_->_queue_name_len);
+ fs.close();
+ } else {
+ std::cerr << "ERROR: Could not open file \"" << fileName_ << "\" for writing." << std::endl;
+ }
+}
+
+void
+JournalFileController::resetFileHeader(const std::string& fileName_) {
+ //std::cout << "*** JournalFileController::resetFileHeader() fn=" << fileName_ << std::endl;
+ file_hdr_t fhdr;
+ readFileHeader(&fhdr, fileName_);
+ ::file_hdr_reset(&fhdr);
+ writeFileHeader(&fhdr, std::string(""), fileName_);
+}
+
+void
+JournalFileController::initialzeFileHeader(const std::string& fileName_,
+ const uint64_t recId_,
+ const uint64_t firstRecOffs_,
+ const uint64_t fileSeqNum_,
+ const std::string& queueName_) {
+ //std::cout << "*** JournalFileController::initialzeFileHeader() fn=" << fileName_ << " sn=" << fileSeqNum_ << " qn=" << queueName_ << std::endl;
+ file_hdr_t fhdr;
+ readFileHeader(&fhdr, fileName_);
+ ::file_hdr_init(&fhdr, 0, recId_, firstRecOffs_, fileSeqNum_, queueName_.length(), queueName_.data());
+ writeFileHeader(&fhdr, queueName_, fileName_);
+}
+
+uint64_t
+JournalFileController::getNextFileSeqNum() {
+ return __sync_add_and_fetch(&fileSeqCounter, 1); // GCC atomic increment, not portable
+}
+
+}} // namespace qpid::qls_jrnl
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h
new file mode 100644
index 0000000000..cd23afd959
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_
+#define QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_
+
+#include <deque>
+#include "qpid/linearstore/jrnl/smutex.h"
+
+struct file_hdr_t;
+namespace qpid {
+namespace qls_jrnl {
+class EmptyFilePool;
+class JournalFile;
+//typedef struct file_hdr_t file_hdr_t;
+
+class JournalFileController
+{
+protected:
+ typedef std::deque<const JournalFile*> JournalFileList_t;
+ typedef JournalFileList_t::iterator JournalFileListItr_t;
+
+ const std::string dir;
+ EmptyFilePool* efpp;
+ uint64_t fileSeqCounter;
+ JournalFileList_t journalFileList;
+ smutex journalFileListMutex;
+
+public:
+ JournalFileController(const std::string& dir,
+ EmptyFilePool* efpp);
+ virtual ~JournalFileController();
+
+ void pullEmptyFileFromEfp(const uint64_t recId, const uint64_t firstRecOffs, const std::string& queueName);
+ void purgeFilesToEfp();
+ void finalize();
+ void setFileSeqNum(const uint64_t fileSeqNum);
+
+protected:
+ std::string readFileHeader(file_hdr_t* fhdr, const std::string& fileName);
+ void writeFileHeader(const file_hdr_t* fhdr, const std::string& queueName, const std::string& fileName);
+ void resetFileHeader(const std::string& fileName);
+ void initialzeFileHeader(const std::string& fileName, const uint64_t recId, const uint64_t firstRecOffs,
+ const uint64_t fileSeqNum, const std::string& queueName);
+ uint64_t getNextFileSeqNum();
+};
+
+}} // namespace qpid::qls_jrnl
+
+#endif // QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp
new file mode 100644
index 0000000000..22a6412c21
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "JournalLog.h"
+#include <iostream>
+
+namespace qpid {
+namespace qls_jrnl {
+
+JournalLog::JournalLog() {}
+
+JournalLog::~JournalLog() {}
+
+void
+JournalLog::log(log_level_t ll, const std::string& jid, const std::string& log_stmt) const {
+ log(ll, jid.c_str(), log_stmt.c_str());
+}
+
+void
+JournalLog::log(log_level_t ll, const char* jid, const char* const log_stmt) const {
+ if (ll > LOG_ERROR) {
+ std::cerr << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl;
+ } else if (ll > LOG_INFO) {
+ std::cout << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl;
+ }
+
+}
+
+const char*
+JournalLog::log_level_str(log_level_t ll) {
+ switch (ll)
+ {
+ case LOG_TRACE: return "TRACE";
+ case LOG_DEBUG: return "DEBUG";
+ case LOG_INFO: return "INFO";
+ case LOG_NOTICE: return "NOTICE";
+ case LOG_WARN: return "WARN";
+ case LOG_ERROR: return "ERROR";
+ case LOG_CRITICAL: return "CRITICAL";
+ }
+ return "<log level unknown>";
+}
+
+}} // namespace qpid::qls_jrnl
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h
new file mode 100644
index 0000000000..83ba80d0b5
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_JOURNALLOG_H_
+#define QPID_LINEARSTORE_JOURNALLOG_H_
+
+#include <string>
+
+namespace qpid {
+namespace qls_jrnl {
+
+class JournalLog
+{
+public:
+ typedef enum _log_level
+ {
+ LOG_TRACE = 0,
+ LOG_DEBUG,
+ LOG_INFO,
+ LOG_NOTICE,
+ LOG_WARN,
+ LOG_ERROR,
+ LOG_CRITICAL
+ } log_level_t;
+
+protected:
+ JournalLog();
+ virtual ~JournalLog();
+
+public:
+ virtual void log(log_level_t level, const std::string& jid, const std::string& log_stmt) const;
+ virtual void log(log_level_t level, const char* jid, const char* const log_stmt) const;
+ static const char* log_level_str(log_level_t ll);
+};
+
+}} // namespace qpid::qls_jrnl
+
+#endif // QPID_LINEARSTORE_JOURNALLOG_H_
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enums.h b/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
index fffb8c07d7..31fa4e6ba3 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_ENUMS_H
-#define QPID_LEGACYSTORE_JRNL_ENUMS_H
+#ifndef QPID_LINEARSTORE_JRNL_ENUMS_H
+#define QPID_LINEARSTORE_JRNL_ENUMS_H
namespace qpid
{
@@ -64,6 +64,7 @@ namespace qls_jrnl
return "<iores unknown>";
}
+/*
enum _log_level
{
LOG_TRACE = 0,
@@ -74,9 +75,9 @@ namespace qls_jrnl
LOG_ERROR,
LOG_CRITICAL
};
- typedef _log_level log_level;
+ typedef _log_level log_level_t;
- static inline const char* log_level_str(log_level ll)
+ static inline const char* log_level_str(log_level_t ll)
{
switch (ll)
{
@@ -90,8 +91,9 @@ namespace qls_jrnl
}
return "<log level unknown>";
}
+*/
}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_ENUMS_H
+#endif // ifndef QPID_LINEARSTORE_JRNL_ENUMS_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
index fc44e35331..110fee1334 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
@@ -40,7 +40,6 @@
#endif
*/
-
/**
* <b>Rule:</b> Data block size (JRNL_DBLK_SIZE) MUST be a power of 2 such that
* <pre>
@@ -51,6 +50,7 @@
#define JRNL_DBLK_SIZE 128 /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */
#define JRNL_SBLK_SIZE_DBLKS 32 /**< Disk softblock size in multiples of JRNL_DBLK_SIZE */
#define JRNL_SBLK_SIZE JRNL_SBLK_SIZE_DBLKS * JRNL_DBLK_SIZE /**< Disk softblock size in bytes */
+#define JRNL_SBLK_SIZE_KIB JRNL_SBLK_SIZE / 1024 /**< Disk softblock size in KiB */
//#define JRNL_MIN_FILE_SIZE 128 ///< Min. jrnl file size in sblks (excl. file_hdr)
//#define JRNL_MAX_FILE_SIZE 4194176 ///< Max. jrnl file size in sblks (excl. file_hdr)
//#define JRNL_MIN_NUM_FILES 4 ///< Min. number of journal files
@@ -68,7 +68,7 @@
//
//#define JRNL_INFO_EXTENSION "jinf" ///< Extension for journal info files
//#define JRNL_DATA_EXTENSION "jdat" ///< Extension for journal data files
-#define QLS_JRNL_FILE_EXTENSION "jdat" /**< Extension for journal data files */
+#define QLS_JRNL_FILE_EXTENSION ".jrnl" /**< Extension for journal data files */
//#define RHM_JDAT_TXA_MAGIC 0x614d4852 ///< ("RHMa" in little endian) Magic for dtx abort hdrs
#define QLS_TXA_MAGIC 0x61534c51 /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
//#define RHM_JDAT_TXC_MAGIC 0x634d4852 ///< ("RHMc" in little endian) Magic for dtx commit hdrs
@@ -83,6 +83,7 @@
#define QLS_EMPTY_MAGIC 0x78534c51 /**< ("QLSx" in little endian) Magic for empty dblk */
//#define RHM_JDAT_VERSION 0x01 ///< Version (of file layout)
#define QLS_JRNL_VERSION 0x0002 /**< Version (of file layout) */
+#define QLS_JRNL_FHDRSIZESBLKS 0x0001 /**< Journal file header size in sblks (as defined by JRNL_SBLK_SIZE) */
//#define RHM_CLEAN_CHAR 0xff ///< Char used to clear empty space on disk
#define QLS_CLEAN_CHAR 0xff ///< Char used to clear empty space on disk
//
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
index 552aa92b9c..126d3f7da3 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
@@ -29,9 +29,11 @@
#include <fstream>
#include <iomanip>
#include <iostream>
+#include <qpid/linearstore/jrnl/EmptyFilePool.h>
//#include "qpid/linearstore/jrnl/file_hdr.h"
#include "qpid/linearstore/jrnl/jerrno.h"
//#include "qpid/linearstore/jrnl/jinf.h"
+#include "qpid/linearstore/jrnl/JournalFileController.h"
#include <limits>
#include <sstream>
#include <unistd.h>
@@ -62,15 +64,16 @@ bool jcntl::init_statics()
// Functions
-jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename):
+jcntl::jcntl(const std::string& jid, const std::string& jdir/*, const std::string& base_filename*/):
_jid(jid),
- _jdir(jdir, base_filename),
- _base_filename(base_filename),
+ _jdir(jdir/*, base_filename*/),
+// _base_filename(base_filename),
_init_flag(false),
_stop_flag(false),
_readonly_flag(false),
- _autostop(true),
- _jfsize_sblks(0),
+// _autostop(true),
+ _jfcp(0),
+// _jfsize_sblks(0),
// _lpmgr(),
_emap(),
_tmap(),
@@ -87,11 +90,16 @@ jcntl::~jcntl()
try { stop(true); }
catch (const jexception& e) { std::cerr << e << std::endl; }
// _lpmgr.finalize();
+ if (_jfcp) {
+ _jfcp->finalize();
+ delete _jfcp;
+ _jfcp = 0;
+ }
}
void
jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max_jfiles,
- const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
+ const uint32_t jfsize_sblks,*/ EmptyFilePool* efpp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
aio_callback* const cbp)
{
_init_flag = false;
@@ -101,6 +109,12 @@ jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_
_emap.clear();
_tmap.clear();
+ if (_jfcp) {
+ _jfcp->finalize();
+ delete _jfcp;
+ _jfcp = 0;
+ }
+
// _lpmgr.finalize();
// Set new file geometry parameters
@@ -117,6 +131,8 @@ jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_
_jdir.clear_dir();
// _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl);
+ _jfcp = new JournalFileController(_jdir.dirname(), efpp);
+ _jfcp->pullEmptyFileFromEfp(1, 4096, _jid);
// _wrfc.initialize(_jfsize_sblks);
// _rrfc.initialize();
// _rrfc.set_findex(0);
@@ -159,7 +175,7 @@ jcntl::recover(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max
highest_rid = _rcvdat._h_rid;
if (_rcvdat._jfull)
throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
- this->log(LOG_DEBUG, _rcvdat.to_log(_jid));
+ this->log(LOG_DEBUG, _jid, _rcvdat.to_log(_jid));
// _lpmgr.recover(_rcvdat, this, &new_fcntl);
@@ -192,6 +208,7 @@ void
jcntl::delete_jrnl_files()
{
stop(true); // wait for AIO to complete
+ _jfcp->purgeFilesToEfp();
_jdir.delete_dir();
}
@@ -407,20 +424,22 @@ jcntl::flush(const bool block_till_aio_cmpl)
return res;
}
+/*
void
-jcntl::log(log_level ll, const std::string& log_stmt) const
+jcntl::log(log_level_t ll, const std::string& log_stmt) const
{
log(ll, log_stmt.c_str());
}
void
-jcntl::log(log_level ll, const char* const log_stmt) const
+jcntl::log(log_level_t ll, const char* const log_stmt) const
{
if (ll > LOG_INFO)
{
std::cout << log_level_str(ll) << ": Journal \"" << _jid << "\": " << log_stmt << std::endl;
}
}
+*/
/*
void
@@ -523,7 +542,7 @@ jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
{
std::ostringstream oss;
oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
- this->log(LOG_CRITICAL, oss.str());
+ this->log(LOG_CRITICAL, _jid, oss.str());
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
}
}
@@ -877,8 +896,8 @@ jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& r
if (!ifsp->is_open())
{
std::ostringstream oss;
- oss << _jdir.dirname() << "/" << _base_filename << ".";
- oss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << QLS_JRNL_FILE_EXTENSION;
+ oss << _jdir.dirname() << "/" /*<< _base_filename*/ << "."; // TODO - linear journal name
+ oss << std::hex << std::setfill('0') << std::setw(4) << fid << QLS_JRNL_FILE_EXTENSION;
ifsp->clear(); // clear eof flag, req'd for older versions of c++
ifsp->open(oss.str().c_str(), std::ios_base::in | std::ios_base::binary);
if (!ifsp->good())
@@ -946,12 +965,12 @@ jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcv
oss << std::hex << "Bad record alignment found at fid=0x" << fid;
oss << " offs=0x" << file_pos << " (likely journal overwrite boundary); " << std::dec;
oss << (JRNL_SBLK_SIZE_DBLKS - (sblk_offs/JRNL_DBLK_SIZE)) << " filler record(s) required.";
- this->log(LOG_WARN, oss.str());
+ this->log(LOG_WARN, _jid, oss.str());
}
const uint32_t xmagic = QLS_EMPTY_MAGIC;
std::ostringstream oss;
- oss << _jdir.dirname() << "/" << _base_filename << ".";
- oss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << QLS_JRNL_FILE_EXTENSION;
+ oss << _jdir.dirname() << "/" /*<< _base_filename*/ << "."; // TODO linear journal name
+ oss << std::hex << std::setfill('0') << std::setw(4) << fid << QLS_JRNL_FILE_EXTENSION;
std::ofstream ofsp(oss.str().c_str(),
std::ios_base::in | std::ios_base::out | std::ios_base::binary);
if (!ofsp.good())
@@ -971,7 +990,7 @@ jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcv
assert(!ofsp.fail());
std::ostringstream oss;
oss << std::hex << "Recover phase write: Wrote filler record: fid=0x" << fid << " offs=0x" << file_pos;
- this->log(LOG_NOTICE, oss.str());
+ this->log(LOG_NOTICE, _jid, oss.str());
file_pos = ofsp.tellp();
}
ofsp.close();
@@ -979,7 +998,7 @@ jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcv
rd._lfid = fid;
// if (!rd._frot)
// rd._ffid = (fid + 1) % rd._njf;
- this->log(LOG_INFO, "Bad record alignment fixed.");
+ this->log(LOG_INFO, _jid, "Bad record alignment fixed.");
}
rd._eo = file_pos;
}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
index debcc02efc..8a2c178f67 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
@@ -31,6 +31,7 @@ namespace qls_jrnl
#include <cstddef>
#include <deque>
+#include <qpid/linearstore/jrnl/JournalLog.h>
#include "qpid/linearstore/jrnl/jdir.h"
//#include "qpid/linearstore/jrnl/fcntl.h"
//#include "qpid/linearstore/jrnl/lpmgr.h"
@@ -45,6 +46,8 @@ namespace qpid
{
namespace qls_jrnl
{
+class EmptyFilePool;
+class JournalFileController;
/**
* \brief Access and control interface for the journal. This is the top-level class for the
@@ -56,7 +59,7 @@ namespace qls_jrnl
* which is used per data block written to the journal, and is used to track its status through
* the AIO enqueue, read and dequeue process.
*/
- class jcntl
+ class jcntl : public JournalLog
{
protected:
/**
@@ -85,7 +88,7 @@ namespace qls_jrnl
* that will be written to disk. No file separator characters should be included here, but
* all other legal filename characters are valid.
*/
- std::string _base_filename;
+// std::string _base_filename;
/**
* \brief Initialized flag
@@ -121,10 +124,11 @@ namespace qls_jrnl
* marker. If not set, then attempts to write will throw exceptions until the journal
* file low water marker moves to the next journal file.
*/
- bool _autostop; ///< Autostop flag - stops journal when overrun occurs
+ //bool _autostop; ///< Autostop flag - stops journal when overrun occurs
// Journal control structures
- uint32_t _jfsize_sblks; ///< Journal file size in sblks
+ JournalFileController* _jfcp;///< Journal File Controller
+ //uint32_t _jfsize_sblks; ///< Journal file size in sblks
//lpmgr _lpmgr; ///< LFID-PFID manager tracks inserted journal files
enq_map _emap; ///< Enqueue map for low water mark management
txn_map _tmap; ///< Transaction map open transactions
@@ -148,7 +152,7 @@ namespace qls_jrnl
* \param jdir The directory which will contain the journal files.
* \param base_filename The string which will be used to start all journal filenames.
*/
- jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename);
+ jcntl(const std::string& jid, const std::string& jdir/*, const std::string& base_filename*/);
/**
* \brief Destructor.
@@ -190,7 +194,7 @@ namespace qls_jrnl
* \exception TODO
*/
void initialize(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles,
- const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
+ const uint32_t jfsize_sblks,*/EmptyFilePool* efpp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
aio_callback* const cbp);
/**
@@ -628,17 +632,17 @@ namespace qls_jrnl
* the same directory, their base filenames <b>MUST</b> be different or else the instances
* will overwrite one another.
*/
- inline const std::string& base_filename() const { return _base_filename; }
+// inline const std::string& base_filename() const { return _base_filename; }
// inline uint16_t num_jfiles() const; { return _lpmgr.num_jfiles(); }
// inline fcntl* get_fcntlp(const uint16_t lfid) const { return _lpmgr.get_fcntlp(lfid); }
- inline uint32_t jfsize_sblks() const { return _jfsize_sblks; }
+// inline uint32_t jfsize_sblks() const { return _jfsize_sblks; }
// Logging
- virtual void log(log_level level, const std::string& log_stmt) const;
- virtual void log(log_level level, const char* const log_stmt) const;
+// virtual void log(log_level_t level, const std::string& log_stmt) const;
+// virtual void log(log_level_t level, const char* const log_stmt) const;
// FIXME these are _rmgr to _wmgr interactions, remove when _rmgr contains ref to _wmgr:
//void chk_wr_frot();
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
index a191ed86d9..89bce926e0 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
@@ -37,9 +37,9 @@ namespace qpid
namespace qls_jrnl
{
-jdir::jdir(const std::string& dirname, const std::string& _base_filename):
- _dirname(dirname),
- _base_filename(_base_filename)
+jdir::jdir(const std::string& dirname/*, const std::string& _base_filename*/):
+ _dirname(dirname)/*,
+ _base_filename(_base_filename)*/
{}
jdir::~jdir()
@@ -88,21 +88,22 @@ jdir::create_dir(const std::string& dirname)
void
jdir::clear_dir(const bool create_flag)
{
- clear_dir(_dirname, _base_filename, create_flag);
+ clear_dir(_dirname/*, _base_filename*/, create_flag);
}
void
-jdir::clear_dir(const char* dirname, const char* base_filename, const bool create_flag)
+jdir::clear_dir(const char* dirname/*, const char* base_filename*/, const bool create_flag)
{
- clear_dir(std::string(dirname), std::string(base_filename), create_flag);
+ clear_dir(std::string(dirname)/*, std::string(base_filename)*/, create_flag);
}
void
-jdir::clear_dir(const std::string& dirname, const std::string&
+jdir::clear_dir(const std::string& dirname/*, const std::string&
#ifndef RHM_JOWRITE
base_filename
#endif
+*/
, const bool create_flag)
{
DIR* dir = ::opendir(dirname.c_str());
@@ -117,7 +118,7 @@ jdir::clear_dir(const std::string& dirname, const std::string&
oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "clear_dir");
}
-#ifndef RHM_JOWRITE
+//#ifndef RHM_JOWRITE
struct dirent* entry;
bool found = false;
std::string bak_dir;
@@ -126,13 +127,13 @@ jdir::clear_dir(const std::string& dirname, const std::string&
// Ignore . and ..
if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0)
{
- if (std::strlen(entry->d_name) > base_filename.size())
+ if (std::strlen(entry->d_name) >= 3) // 'bak'
{
- if (std::strncmp(entry->d_name, base_filename.c_str(), base_filename.size()) == 0)
+ if (std::strncmp(entry->d_name, "bak", 3) == 0)
{
if (!found)
{
- bak_dir = create_bak_dir(dirname, base_filename);
+ bak_dir = create_bak_dir(dirname/*, base_filename*/);
found = true;
}
std::ostringstream oldname;
@@ -154,16 +155,16 @@ jdir::clear_dir(const std::string& dirname, const std::string&
// FIXME: Find out why this fails with false alarms/errors from time to time...
// While commented out, there is no error capture from reading dir entries.
// check_err(errno, dir, dirname, "clear_dir");
-#endif
+//#endif
close_dir(dir, dirname, "clear_dir");
}
// === push_down ===
std::string
-jdir::push_down(const std::string& dirname, const std::string& target_dir, const std::string& bak_dir_base)
+jdir::push_down(const std::string& dirname, const std::string& target_dir/*, const std::string& bak_dir_base*/)
{
- std::string bak_dir_name = create_bak_dir(dirname, bak_dir_base);
+ std::string bak_dir_name = create_bak_dir(dirname/*, bak_dir_base*/);
DIR* dir = ::opendir(dirname.c_str());
if (!dir)
@@ -202,18 +203,18 @@ jdir::push_down(const std::string& dirname, const std::string& target_dir, const
void
jdir::verify_dir()
{
- verify_dir(_dirname, _base_filename);
+ verify_dir(_dirname/*, _base_filename*/);
}
void
-jdir::verify_dir(const char* dirname, const char* base_filename)
+jdir::verify_dir(const char* dirname/*, const char* base_filename*/)
{
- verify_dir(std::string(dirname), std::string(base_filename));
+ verify_dir(std::string(dirname)/*, std::string(base_filename)*/);
}
void
-jdir::verify_dir(const std::string& dirname, const std::string& /*base_filename*/)
+jdir::verify_dir(const std::string& dirname/*, const std::string& base_filename*/)
{
if (!is_dir(dirname))
{
@@ -323,7 +324,7 @@ jdir::delete_dir(const std::string& dirname, bool children_only)
std::string
-jdir::create_bak_dir(const std::string& dirname, const std::string& base_filename)
+jdir::create_bak_dir(const std::string& dirname)
{
DIR* dir = ::opendir(dirname.c_str());
long dir_num = 0L;
@@ -339,13 +340,11 @@ jdir::create_bak_dir(const std::string& dirname, const std::string& base_filenam
// Ignore . and ..
if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0)
{
- if (std::strlen(entry->d_name) == base_filename.size() + 10) // Format: basename.bak.XXXX
+ if (std::strlen(entry->d_name) == 9) // Format: _bak.XXXX
{
- std::ostringstream oss;
- oss << "_" << base_filename << ".bak.";
- if (std::strncmp(entry->d_name, oss.str().c_str(), base_filename.size() + 6) == 0)
+ if (std::strncmp(entry->d_name, "_bak.", 5) == 0)
{
- long this_dir_num = std::strtol(entry->d_name + base_filename.size() + 6, 0, 16);
+ long this_dir_num = std::strtol(entry->d_name + 5, 0, 16);
if (this_dir_num > dir_num)
dir_num = this_dir_num;
}
@@ -358,8 +357,7 @@ jdir::create_bak_dir(const std::string& dirname, const std::string& base_filenam
close_dir(dir, dirname, "create_bak_dir");
std::ostringstream dn;
- dn << dirname << "/_" << base_filename << ".bak." << std::hex << std::setw(4) <<
- std::setfill('0') << ++dir_num;
+ dn << dirname << "/_bak." << std::hex << std::setw(4) << std::setfill('0') << ++dir_num;
if (::mkdir(dn.str().c_str(), S_IRWXU | S_IRWXG | S_IROTH))
{
std::ostringstream oss;
@@ -411,6 +409,32 @@ jdir::exists(const std::string& name)
}
void
+jdir::read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links) {
+ struct stat s;
+ if (is_dir(name)) {
+ DIR* dir = ::opendir(name.c_str());
+ if (dir != 0) {
+ struct dirent* entry;
+ while ((entry = ::readdir(dir)) != 0) {
+ if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0) { // Ignore . and ..
+ std::string full_name(name + "/" + entry->d_name);
+ if (::stat(full_name.c_str(), &s))
+ {
+ ::closedir(dir);
+ std::ostringstream oss;
+ oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir");
+ }
+ if ((S_ISREG(s.st_mode) && incl_files) || (S_ISDIR(s.st_mode) && incl_dirs) || (S_ISLNK(s.st_mode) && incl_links))
+ dir_list.push_back(entry->d_name);
+ }
+ }
+ }
+ close_dir(dir, name, "read_dir");
+ }
+}
+
+void
jdir::check_err(const int err_num, DIR* dir, const std::string& dir_name, const std::string& fn_name)
{
if (err_num)
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
index af4797f44c..53519b1f2c 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_JDIR_H
-#define QPID_LEGACYSTORE_JRNL_JDIR_H
+#ifndef QPID_LINEARSTORE_JRNL_JDIR_H
+#define QPID_LINEARSTORE_JRNL_JDIR_H
namespace qpid
{
@@ -32,6 +32,7 @@ class jdir;
//#include "qpid/linearstore/jrnl/jinf.h"
#include <dirent.h>
#include <string>
+#include <vector>
namespace qpid
{
@@ -46,7 +47,7 @@ namespace qls_jrnl
{
private:
std::string _dirname;
- std::string _base_filename;
+ //std::string _base_filename;
public:
@@ -57,7 +58,7 @@ namespace qls_jrnl
* \param base_filename Filename root used in the creation of %journal files
* and sub-directories.
*/
- jdir(const std::string& dirname, const std::string& base_filename);
+ jdir(const std::string& dirname/*, const std::string& base_filename*/);
virtual ~jdir();
@@ -119,7 +120,7 @@ namespace qls_jrnl
* directory failed.
* \exception jerrno::JERR_JDIR_CLOSEDIR The directory handle could not be closed.
*/
- static void clear_dir(const char* dirname, const char* base_filename,
+ static void clear_dir(const char* dirname/*, const char* base_filename*/,
const bool create_flag = true);
/**
@@ -137,7 +138,7 @@ namespace qls_jrnl
* directory failed.
* \exception jerrno::JERR_JDIR_CLOSEDIR The directory handle could not be closed.
*/
- static void clear_dir(const std::string& dirname, const std::string& base_filename,
+ static void clear_dir(const std::string& dirname/*, const std::string& base_filename*/,
const bool create_flag = true);
@@ -152,7 +153,7 @@ namespace qls_jrnl
* \param bak_dir_base Base name for backup directory to be created in dirname, into which target_dir will be moved.
* \return Name of backup dir into which target_dir was pushed.
*/
- static std::string push_down(const std::string& dirname, const std::string& target_dir, const std::string& bak_dir_base);
+ static std::string push_down(const std::string& dirname, const std::string& target_dir/*, const std::string& bak_dir_base*/);
/**
@@ -184,7 +185,7 @@ namespace qls_jrnl
* \exception jerrno::JERR_JINF_CVALIDFAIL Error validating %jinf file
* \exception jerrno::JERR_JDIR_NOSUCHFILE Expected jdat file is missing
*/
- static void verify_dir(const char* dirname, const char* base_filename);
+ static void verify_dir(const char* dirname/*, const char* base_filename*/);
/**
* \brief Verify that dirname is a valid %journal directory.
@@ -201,7 +202,7 @@ namespace qls_jrnl
* \exception jerrno::JERR_JINF_CVALIDFAIL Error validating %jinf file
* \exception jerrno::JERR_JDIR_NOSUCHFILE Expected jdat file is missing
*/
- static void verify_dir(const std::string& dirname, const std::string& base_filename);
+ static void verify_dir(const std::string& dirname/*, const std::string& base_filename*/);
/**
* \brief Delete the %journal directory and all files and sub--directories that it may
@@ -273,8 +274,8 @@ namespace qls_jrnl
* \exception jerrno::JERR_JDIR_CLOSEDIR The directory handle could not be closed.
* \exception jerrno::JERR_JDIR_MKDIR The backup directory could not be deleted.
*/
- static std::string create_bak_dir(const std::string& dirname,
- const std::string& base_filename);
+ static std::string create_bak_dir(const std::string& dirname/*,
+ const std::string& base_filename*/);
/**
* \brief Return the directory name as a string.
@@ -284,7 +285,7 @@ namespace qls_jrnl
/**
* \brief Return the %journal base filename name as a string.
*/
- inline const std::string& base_filename() const { return _base_filename; }
+// inline const std::string& base_filename() const { return _base_filename; }
/**
* \brief Test whether the named file is a directory.
@@ -335,6 +336,8 @@ namespace qls_jrnl
*/
static bool exists(const std::string& name);
+ static void read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links);
+
/**
* \brief Stream operator
*/
@@ -363,4 +366,4 @@ namespace qls_jrnl
}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JDIR_H
+#endif // ifndef QPID_LINEARSTORE_JRNL_JDIR_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
index 718315a638..ecd5469d8b 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
@@ -31,39 +31,39 @@ std::map<uint32_t, const char*>::iterator jerrno::_err_map_itr;
bool jerrno::_initialized = jerrno::__init();
// generic errors
-const uint32_t jerrno::JERR__MALLOC = 0x0100;
-const uint32_t jerrno::JERR__UNDERFLOW = 0x0101;
-const uint32_t jerrno::JERR__NINIT = 0x0102;
-const uint32_t jerrno::JERR__AIO = 0x0103;
-const uint32_t jerrno::JERR__FILEIO = 0x0104;
-const uint32_t jerrno::JERR__RTCLOCK = 0x0105;
-const uint32_t jerrno::JERR__PTHREAD = 0x0106;
-const uint32_t jerrno::JERR__TIMEOUT = 0x0107;
-const uint32_t jerrno::JERR__UNEXPRESPONSE = 0x0108;
-const uint32_t jerrno::JERR__RECNFOUND = 0x0109;
-const uint32_t jerrno::JERR__NOTIMPL = 0x010a;
+const uint32_t jerrno::JERR__MALLOC = 0x0100;
+const uint32_t jerrno::JERR__UNDERFLOW = 0x0101;
+const uint32_t jerrno::JERR__NINIT = 0x0102;
+const uint32_t jerrno::JERR__AIO = 0x0103;
+const uint32_t jerrno::JERR__FILEIO = 0x0104;
+const uint32_t jerrno::JERR__RTCLOCK = 0x0105;
+const uint32_t jerrno::JERR__PTHREAD = 0x0106;
+const uint32_t jerrno::JERR__TIMEOUT = 0x0107;
+const uint32_t jerrno::JERR__UNEXPRESPONSE = 0x0108;
+const uint32_t jerrno::JERR__RECNFOUND = 0x0109;
+const uint32_t jerrno::JERR__NOTIMPL = 0x010a;
// class jcntl
-const uint32_t jerrno::JERR_JCNTL_STOPPED = 0x0200;
-const uint32_t jerrno::JERR_JCNTL_READONLY = 0x0201;
-const uint32_t jerrno::JERR_JCNTL_AIOCMPLWAIT = 0x0202;
-const uint32_t jerrno::JERR_JCNTL_UNKNOWNMAGIC = 0x0203;
-const uint32_t jerrno::JERR_JCNTL_NOTRECOVERED = 0x0204;
-const uint32_t jerrno::JERR_JCNTL_RECOVERJFULL = 0x0205;
-const uint32_t jerrno::JERR_JCNTL_OWIMISMATCH = 0x0206;
+const uint32_t jerrno::JERR_JCNTL_STOPPED = 0x0200;
+const uint32_t jerrno::JERR_JCNTL_READONLY = 0x0201;
+const uint32_t jerrno::JERR_JCNTL_AIOCMPLWAIT = 0x0202;
+const uint32_t jerrno::JERR_JCNTL_UNKNOWNMAGIC = 0x0203;
+const uint32_t jerrno::JERR_JCNTL_NOTRECOVERED = 0x0204;
+const uint32_t jerrno::JERR_JCNTL_RECOVERJFULL = 0x0205;
+const uint32_t jerrno::JERR_JCNTL_OWIMISMATCH = 0x0206;
// class jdir
-const uint32_t jerrno::JERR_JDIR_NOTDIR = 0x0300;
-const uint32_t jerrno::JERR_JDIR_MKDIR = 0x0301;
-const uint32_t jerrno::JERR_JDIR_OPENDIR = 0x0302;
-const uint32_t jerrno::JERR_JDIR_READDIR = 0x0303;
-const uint32_t jerrno::JERR_JDIR_CLOSEDIR = 0x0304;
-const uint32_t jerrno::JERR_JDIR_RMDIR = 0x0305;
-const uint32_t jerrno::JERR_JDIR_NOSUCHFILE = 0x0306;
-const uint32_t jerrno::JERR_JDIR_FMOVE = 0x0307;
-const uint32_t jerrno::JERR_JDIR_STAT = 0x0308;
-const uint32_t jerrno::JERR_JDIR_UNLINK = 0x0309;
-const uint32_t jerrno::JERR_JDIR_BADFTYPE = 0x030a;
+const uint32_t jerrno::JERR_JDIR_NOTDIR = 0x0300;
+const uint32_t jerrno::JERR_JDIR_MKDIR = 0x0301;
+const uint32_t jerrno::JERR_JDIR_OPENDIR = 0x0302;
+const uint32_t jerrno::JERR_JDIR_READDIR = 0x0303;
+const uint32_t jerrno::JERR_JDIR_CLOSEDIR = 0x0304;
+const uint32_t jerrno::JERR_JDIR_RMDIR = 0x0305;
+const uint32_t jerrno::JERR_JDIR_NOSUCHFILE = 0x0306;
+const uint32_t jerrno::JERR_JDIR_FMOVE = 0x0307;
+const uint32_t jerrno::JERR_JDIR_STAT = 0x0308;
+const uint32_t jerrno::JERR_JDIR_UNLINK = 0x0309;
+const uint32_t jerrno::JERR_JDIR_BADFTYPE = 0x030a;
// class fcntl
//const uint32_t jerrno::JERR_FCNTL_OPENWR = 0x0400;
@@ -82,31 +82,31 @@ const uint32_t jerrno::JERR_JDIR_BADFTYPE = 0x030a;
//const uint32_t jerrno::JERR_RRFC_OPENRD = 0x0600;
// class jrec, enq_rec, deq_rec, txn_rec
-const uint32_t jerrno::JERR_JREC_BADRECHDR = 0x0700;
-const uint32_t jerrno::JERR_JREC_BADRECTAIL = 0x0701;
+const uint32_t jerrno::JERR_JREC_BADRECHDR = 0x0700;
+const uint32_t jerrno::JERR_JREC_BADRECTAIL = 0x0701;
// class wmgr
-const uint32_t jerrno::JERR_WMGR_BADPGSTATE = 0x0801;
-const uint32_t jerrno::JERR_WMGR_BADDTOKSTATE = 0x0802;
-const uint32_t jerrno::JERR_WMGR_ENQDISCONT = 0x0803;
-const uint32_t jerrno::JERR_WMGR_DEQDISCONT = 0x0804;
-const uint32_t jerrno::JERR_WMGR_DEQRIDNOTENQ = 0x0805;
+const uint32_t jerrno::JERR_WMGR_BADPGSTATE = 0x0801;
+const uint32_t jerrno::JERR_WMGR_BADDTOKSTATE = 0x0802;
+const uint32_t jerrno::JERR_WMGR_ENQDISCONT = 0x0803;
+const uint32_t jerrno::JERR_WMGR_DEQDISCONT = 0x0804;
+const uint32_t jerrno::JERR_WMGR_DEQRIDNOTENQ = 0x0805;
// class rmgr
-const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900;
-const uint32_t jerrno::JERR_RMGR_RIDMISMATCH = 0x0901;
+const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900;
+const uint32_t jerrno::JERR_RMGR_RIDMISMATCH = 0x0901;
//const uint32_t jerrno::JERR_RMGR_FIDMISMATCH = 0x0902;
-const uint32_t jerrno::JERR_RMGR_ENQSTATE = 0x0903;
-const uint32_t jerrno::JERR_RMGR_BADRECTYPE = 0x0904;
+const uint32_t jerrno::JERR_RMGR_ENQSTATE = 0x0903;
+const uint32_t jerrno::JERR_RMGR_BADRECTYPE = 0x0904;
// class data_tok
-const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00;
+const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00;
// const uint32_t jerrno::JERR_DTOK_RIDNOTSET = 0x0a01;
// class enq_map, txn_map
-const uint32_t jerrno::JERR_MAP_DUPLICATE = 0x0b00;
-const uint32_t jerrno::JERR_MAP_NOTFOUND = 0x0b01;
-const uint32_t jerrno::JERR_MAP_LOCKED = 0x0b02;
+const uint32_t jerrno::JERR_MAP_DUPLICATE = 0x0b00;
+const uint32_t jerrno::JERR_MAP_NOTFOUND = 0x0b01;
+const uint32_t jerrno::JERR_MAP_LOCKED = 0x0b02;
// class jinf
//const uint32_t jerrno::JERR_JINF_CVALIDFAIL = 0x0c00;
@@ -121,6 +121,13 @@ const uint32_t jerrno::JERR_MAP_LOCKED = 0x0b02;
//const uint32_t jerrno::JERR_JINF_OWIBAD = 0x0c09;
//const uint32_t jerrno::JERR_JINF_ZEROLENFILE = 0x0c0a;
+// EFP errors
+const uint32_t jerrno::JERR_EFP_BADPARTITIONNAME = 0x0d01;
+const uint32_t jerrno::JERR_EFP_BADPARTITIONDIR = 0x0d02;
+const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME = 0x0d03;
+const uint32_t jerrno::JERR_EFP_NOEFP = 0x0d04;
+const uint32_t jerrno::JERR_EFP_EMPTY = 0x0d05;
+
// Negative returns for some functions
const int32_t jerrno::AIO_TIMEOUT = -1;
const int32_t jerrno::LOCK_TAKEN = -2;
@@ -222,6 +229,13 @@ jerrno::__init()
// _err_map[JERR_JINF_OWIBAD] = "JERR_JINF_OWIBAD: Journal data files have inconsistent OWI flags; >1 transition found in non-auto-expand or min-size journal";
// _err_map[JERR_JINF_ZEROLENFILE] = "JERR_JINF_ZEROLENFILE: Journal info file zero length";
+ // EFP errors
+ _err_map[JERR_EFP_BADPARTITIONNAME] = "JERR_EFP_BADPARTITIONNAME: Invalid partition name (must be \'pNNN\' where NNN is a non-zero number)";
+ _err_map[JERR_EFP_BADEFPDIRNAME] = "JERR_EFP_BADEFPDIRNAME: Bad Empty File Pool directory name (must be \'NNNk\', where NNN is a number which is a multiple of 4)";
+ _err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid partition directory";
+ _err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for given partition and empty file size";
+ _err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty";
+
//_err_map[] = "";
return true;
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
index 19b9954c93..2ebabd84b8 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
@@ -140,6 +140,13 @@ namespace qls_jrnl
// static const uint32_t JERR_JINF_OWIBAD; ///< OWI inconsistent (>1 transition in non-ae journal)
// static const uint32_t JERR_JINF_ZEROLENFILE; ///< Journal info file is zero length (empty).
+ // EFP errors
+ static const uint32_t JERR_EFP_BADPARTITIONNAME; ///< Partition name invalid or of value 0
+ static const uint32_t JERR_EFP_BADEFPDIRNAME; ///< Empty File Pool directory name invalid
+ static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory
+ static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size
+ static const uint32_t JERR_EFP_EMPTY; ///< EFP empty
+
// Negative returns for some functions
static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return
static const int32_t LOCK_TAKEN; ///< Attempted to take lock, but it was taken by another thread
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
index b6d0f0f2ad..9722b78e81 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
@@ -61,7 +61,7 @@ rmgr::initialize(aio_callback* const cbp)
throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize");
}
_fhdr_aio_cb_ptr = new aio_cb;
- std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb*));
+ std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb));
}
void
@@ -638,6 +638,7 @@ rmgr::init_aio_reads(const int16_t /*first_uninit*/, const uint16_t /*num_uninit
void
rmgr::rotate_page()
{
+/*
_page_cb_arr[_pg_index]._rdblks = 0;
_page_cb_arr[_pg_index]._state = UNUSED;
if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS)
@@ -654,6 +655,7 @@ rmgr::rotate_page()
// Need to move reset into if (_rrfc.file_rotate()) above.
if (_pg_cntr >= (_jc->jfsize_sblks() / JRNL_RMGR_PAGE_SIZE))
_pg_cntr = 0;
+*/
}
uint32_t
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
index e8263ea281..e8ebe9db94 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
@@ -20,28 +20,69 @@
*/
#include "file_hdr.h"
+#include <string.h>
-int file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t fro, const uint32_t file_count, const uint64_t file_size,
- const uint64_t file_number) {
- rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
+void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks,
+ const uint16_t efp_partition, const uint64_t file_size) {
+ rec_hdr_init(&dest->_rhdr, magic, version, 0, 0);
+ dest->_fhdr_size_sblks = fhdr_size_sblks;
+ dest->_efp_partition = efp_partition;
+ dest->_reserved = 0;
+ dest->_file_size_kib = file_size;
+ dest->_fro = 0;
+ dest->_ts_nsec = 0;
+ dest->_ts_sec = 0;
+ dest->_file_number = 0;
+ dest->_queue_name_len = 0;
+}
+
+int file_hdr_init(file_hdr_t* dest, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
+ const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) {
+ dest->_rhdr._uflag = uflag;
+ dest->_rhdr._rid = rid;
dest->_fro = fro;
- dest->_file_count = file_count;
- dest->_file_size = file_size;
dest->_file_number = file_number;
+ if (sizeof(file_hdr_t) + queue_name_len < MAX_FILE_HDR_LEN) {
+ dest->_queue_name_len = queue_name_len;
+ } else {
+ dest->_queue_name_len = MAX_FILE_HDR_LEN - sizeof(file_hdr_t);
+ }
+ dest->_queue_name_len = queue_name_len;
+ memcpy(dest + sizeof(file_hdr_t), queue_name, queue_name_len);
return set_time_now(dest);
}
void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src) {
rec_hdr_copy(&dest->_rhdr, &src->_rhdr);
+ dest->_fhdr_size_sblks = src->_fhdr_size_sblks; // Should this be copied?
+ dest->_efp_partition = src->_efp_partition; // Should this be copied?
+ dest->_file_size_kib = src->_file_size_kib;
dest->_fro = src->_fro;
dest->_ts_sec = src->_ts_sec;
dest->_ts_nsec = src->_ts_nsec;
- dest->_file_count = src->_file_count;
- dest->_file_size = src->_file_size;
dest->_file_number = src->_file_number;
}
+void file_hdr_reset(file_hdr_t* target) {
+ target->_rhdr._uflag = 0;
+ target->_rhdr._rid = 0;
+ target->_fro = 0;
+ target->_ts_sec = 0;
+ target->_ts_nsec = 0;
+ target->_file_number = 0;
+ target->_queue_name_len = 0;
+ memset(target + sizeof(file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(file_hdr_t));
+}
+
+int is_file_hdr_reset(file_hdr_t* target) {
+ return target->_rhdr._uflag == 0 &&
+ target->_rhdr._rid == 0 &&
+ target->_ts_sec == 0 &&
+ target->_ts_nsec == 0 &&
+ target->_file_number == 0 &&
+ target->_queue_name_len == 0;
+}
+
int set_time_now(file_hdr_t *fh)
{
struct timespec ts;
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
index 304abf0700..6a2cae4ba4 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
@@ -28,6 +28,8 @@
extern "C"{
#endif
+#define MAX_FILE_HDR_LEN 4096 // Set to 1 sblk
+
#pragma pack(1)
/**
@@ -47,44 +49,50 @@ extern "C"{
* +---+---+---+---+---+---+---+---+ | struct rec_hdr_t
* | first rid in file | |
* +---+---+---+---+---+---+---+---+ -+
+ * | fs | partn | reserved |
+ * +---+---+---+---+---+---+---+---+
+ * | file-size |
+ * +---+---+---+---+---+---+---+---+
* | fro |
* +---+---+---+---+---+---+---+---+
* | timestamp (sec) |
* +---+---+---+---+---+---+---+---+
* | timestamp (ns) |
* +---+---+---+---+---+---+---+---+
- * | file-count | reserved |
- * +---+---+---+---+---+---+---+---+
- * | file-size |
- * +---+---+---+---+---+---+---+---+
* | file-number |
* +---+---+---+---+---+---+---+---+
- * | n-len | Queue Name... |
+ * | qnl | Queue Name... |
* +-------+ |
* | |
* +---+---+---+---+---+---+---+---+
*
- * ver = file version (If the format or encoding of this file changes, then this
- * number should be incremented)
+ * ver = Journal version
+ * rid = Record ID
+ * fs = File header size in sblks (defined by JRNL_SBLK_SIZE)
+ * partn = EFP partition from which this file came
* fro = First Record Offset
- * n-len = Length of the queue name in octets.
+ * qnl = Length of the queue name in octets.
* </pre>
*/
typedef struct file_hdr_t {
- rec_hdr_t _rhdr; /**< Common record header struct, but rid field is used for rid of first compete record in file */
- uint64_t _fro; /**< First Record Offset (FRO) */
- uint64_t _ts_sec; /**< Time stamp (seconds part) */
- uint64_t _ts_nsec; /**< Time stamp (nanoseconds part) */
- uint32_t _file_count; /**< Total number of files in linear sequence at time of writing this file */
+ rec_hdr_t _rhdr; /**< Common record header struct, but rid field is used for rid of first compete record in file */
+ uint16_t _fhdr_size_sblks; /**< File header size in sblks (defined by JRNL_SBLK_SIZE) */
+ uint16_t _efp_partition; /**< EFP Partition number from which this file was obtained */
uint32_t _reserved;
- uint64_t _file_size; /**< Size of this file in octets, including header sblk */
- uint64_t _file_number; /**< The logical number of this file in a monotonically increasing sequence */
- uint16_t _name_length; /**< Length of the queue name in octets, which follows this struct in the header */
+ uint64_t _file_size_kib; /**< Size of this file in KiB, excluding header sblk */
+ uint64_t _fro; /**< First Record Offset (FRO) */
+ uint64_t _ts_sec; /**< Time stamp (seconds part) */
+ uint64_t _ts_nsec; /**< Time stamp (nanoseconds part) */
+ uint64_t _file_number; /**< The logical number of this file in a monotonically increasing sequence */
+ uint16_t _queue_name_len; /**< Length of the queue name in octets, which follows this struct in the header */
} file_hdr_t;
-int file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t fro, const uint32_t file_count, const uint64_t file_size,
- const uint64_t file_number);
+void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks,
+ const uint16_t efp_partition, const uint64_t file_size);
+int file_hdr_init(file_hdr_t* dest, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
+ const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name);
+void file_hdr_reset(file_hdr_t* target);
+int is_file_hdr_reset(file_hdr_t* target);
void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src);
int set_time_now(file_hdr_t *fh);
void set_time(file_hdr_t *fh, struct timespec *ts);
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
index ccc9bf6bab..3062f059cd 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
@@ -45,9 +45,9 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/):
_fhdr_ptr_arr(0),
_fhdr_aio_cb_arr(0),
_cached_offset_dblks(0),
- _jfsize_dblks(0),
- _jfsize_pgs(0),
- _num_jfiles(0),
+// _jfsize_dblks(0),
+// _jfsize_pgs(0),
+// _num_jfiles(0),
_enq_busy(false),
_deq_busy(false),
_abort_busy(false),
@@ -65,9 +65,9 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/,
_fhdr_ptr_arr(0),
_fhdr_aio_cb_arr(0),
_cached_offset_dblks(0),
- _jfsize_dblks(0),
- _jfsize_pgs(0),
- _num_jfiles(0),
+// _jfsize_dblks(0),
+// _jfsize_pgs(0),
+// _num_jfiles(0),
_enq_busy(false),
_deq_busy(false),
_abort_busy(false),
@@ -94,9 +94,9 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks,
initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
- _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE_DBLKS;
- _jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks;
- assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0);
+// _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE_DBLKS;
+// _jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks;
+// assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0);
if (eo)
{
@@ -555,7 +555,7 @@ wmgr::file_header_check(const uint64_t /*rid*/, const bool /*cont*/, const uint3
}
void
-wmgr::flush_check(iores& res, bool& cont, bool& done)
+wmgr::flush_check(iores& res, bool& /*cont*/, bool& done)
{
// Is page is full, flush
if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS)
@@ -569,6 +569,7 @@ wmgr::flush_check(iores& res, bool& cont, bool& done)
done = true;
}
+/*
// If file is full, rotate to next file
if (_pg_cntr >= _jfsize_pgs)
{
@@ -583,6 +584,7 @@ wmgr::flush_check(iores& res, bool& cont, bool& done)
done = true;
}
}
+*/
}
}
@@ -590,12 +592,14 @@ iores
wmgr::flush()
{
iores res = write_flush();
+/*
if (_pg_cntr >= _jfsize_pgs)
{
iores rfres = rotate_file();
if (rfres != RHM_IORES_SUCCESS)
res = rfres;
}
+*/
return res;
}
@@ -836,12 +840,13 @@ wmgr::is_txn_synced(const std::string& xid)
}
void
-wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, const uint16_t wcache_num_pages)
+wmgr::initialize(aio_callback* const /*cbp*/, const uint32_t /*wcache_pgsize_sblks*/, const uint16_t /*wcache_num_pages*/)
{
+/*
pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
wmgr::clean();
- /*_num_jfiles = _jc->num_jfiles();*/ // TODO: replace for linearstore: _jc->num_jfiles()
- if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize /** _num_jfiles*/))
+ _num_jfiles = _jc->num_jfiles(); // TODO: replace for linearstore: _jc->num_jfiles()
+ if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * _num_jfiles))
{
wmgr::clean();
std::ostringstream oss;
@@ -863,6 +868,7 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, co
_ddtokl.clear();
_cached_offset_dblks = 0;
_enq_busy = false;
+*/
}
iores
@@ -988,7 +994,7 @@ void
wmgr::write_fhdr(uint64_t rid, uint16_t fid, uint16_t /*lid*/, std::size_t fro)
{
file_hdr_t fhdr/*(QLS_FILE_MAGIC, QLS_JRNL_VERSION, rid, fid, lid, fro, _wrfc.owi(), true)*/;
- /*int err =*/ ::file_hdr_init(&fhdr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, rid, fro, 0, 0, 0);
+ /*int err =*/ ::file_hdr_init(&fhdr, 0, rid, fro, 0, _jc->id().length(), _jc->id().c_str());
std::memcpy(_fhdr_ptr_arr[fid], &fhdr, sizeof(fhdr));
#ifdef RHM_CLEAN
std::memset((char*)_fhdr_ptr_arr[fid] + sizeof(fhdr), RHM_CLEAN_CHAR, _sblksize - sizeof(fhdr));
@@ -1030,8 +1036,8 @@ wmgr::clean()
if (_fhdr_aio_cb_arr)
{
- for (uint32_t i=0; i<_num_jfiles; i++)
- delete _fhdr_aio_cb_arr[i];
+// for (uint32_t i=0; i<_num_jfiles; i++)
+// delete _fhdr_aio_cb_arr[i];
std::free(_fhdr_aio_cb_arr);
_fhdr_aio_cb_arr = 0;
}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
index 68de2db58f..a4afd8974c 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
@@ -67,9 +67,9 @@ namespace qls_jrnl
aio_cb** _fhdr_aio_cb_arr; ///< Array of iocb pointers for file header writes
uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks)
std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list
- uint32_t _jfsize_dblks; ///< Journal file size in dblks (NOT sblks!)
- uint32_t _jfsize_pgs; ///< Journal file size in cache pages
- uint16_t _num_jfiles; ///< Number of files used in iocb mallocs
+// uint32_t _jfsize_dblks; ///< Journal file size in dblks (NOT sblks!)
+// uint32_t _jfsize_pgs; ///< Journal file size in cache pages
+// uint16_t _num_jfiles; ///< Number of files used in iocb mallocs
// TODO: Convert _enq_busy etc into a proper threadsafe lock
// TODO: Convert to enum? Are these encodes mutually exclusive?
diff --git a/qpid/cpp/src/qpid/linearstore/management-schema.xml b/qpid/cpp/src/qpid/linearstore/management-schema.xml
index a9f7b143ad..dce8f7988a 100644
--- a/qpid/cpp/src/qpid/linearstore/management-schema.xml
+++ b/qpid/cpp/src/qpid/linearstore/management-schema.xml
@@ -22,15 +22,15 @@
<class name="Store">
<property name="brokerRef" type="objId" access="RO" references="qpid.Broker" index="y" parentRef="y"/>
<property name="location" type="sstr" access="RO" desc="Logical directory on disk"/>
- <property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/>
- <property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/>
+ <!--property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/-->
+ <!--property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/-->
<property name="tplIsInitialized" type="bool" access="RO" desc="Transaction prepared list has been initialized by a transactional prepare"/>
<property name="tplDirectory" type="sstr" access="RO" desc="Transaction prepared list directory"/>
<property name="tplWritePageSize" type="uint32" access="RO" unit="byte" desc="Page size in transaction prepared list write-page-cache"/>
<property name="tplWritePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in transaction prepared list write-page-cache"/>
- <property name="tplInitialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to transaction prepared list journal"/>
- <property name="tplDataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file in transaction prepared list journal"/>
- <property name="tplCurrentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to transaction prepared list journal"/>
+ <!--property name="tplInitialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to transaction prepared list journal"/-->
+ <!--property name="tplDataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file in transaction prepared list journal"/-->
+ <!--property name="tplCurrentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to transaction prepared list journal"/-->
<statistic name="tplTransactionDepth" type="hilo32" unit="txn" desc="Number of currently enqueued prepared transactions"/>
<statistic name="tplTxnPrepares" type="count64" unit="record" desc="Total transaction prepares on transaction prepared list"/>
@@ -48,11 +48,11 @@
<property name="writePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in write-page-cache"/>
<property name="readPageSize" type="uint32" access="RO" unit="byte" desc="Page size in read-page-cache"/>
<property name="readPages" type="uint32" access="RO" unit="rpage" desc="Number of pages in read-page-cache"/>
- <property name="initialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to this journal"/>
- <property name="autoExpand" type="bool" access="RO" desc="Auto-expand enabled"/>
- <property name="currentFileCount" type="uint16" access="RO" unit="file" desc="Number of files currently allocated to this journal"/>
- <property name="maxFileCount" type="uint16" access="RO" unit="file" desc="Max number of files allowed for this journal"/>
- <property name="dataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file"/>
+ <!--property name="initialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to this journal"/-->
+ <!--property name="autoExpand" type="bool" access="RO" desc="Auto-expand enabled"/-->
+ <!--property name="currentFileCount" type="uint16" access="RO" unit="file" desc="Number of files currently allocated to this journal"/-->
+ <!--property name="maxFileCount" type="uint16" access="RO" unit="file" desc="Max number of files allowed for this journal"/-->
+ <!--property name="dataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file"/-->
<statistic name="recordDepth" type="hilo32" unit="record" desc="Number of currently enqueued records (durable messages)"/>
<statistic name="enqueues" type="count64" unit="record" desc="Total enqueued records on journal"/>
@@ -76,13 +76,13 @@
<statistic name="writePageCacheDepth" type="hilo32" unit="wpage" desc="Current depth of write-page-cache"/>
<statistic name="readPageCacheDepth" type="hilo32" unit="rpage" desc="Current depth of read-page-cache"/>
- <method name="expand" desc="Increase number of files allocated for this journal">
+ <!--method name="expand" desc="Increase number of files allocated for this journal">
<arg name="by" type="uint32" dir="I" desc="Number of files to increase journal size by"/>
- </method>
+ </method-->
</class>
<eventArguments>
- <arg name="autoExpand" type="bool" desc="Journal auto-expand enabled"/>
+ <!--arg name="autoExpand" type="bool" desc="Journal auto-expand enabled"/-->
<arg name="fileSize" type="uint32" desc="Journal file size in bytes"/>
<arg name="jrnlId" type="sstr" desc="Journal Id"/>
<arg name="numEnq" type="uint32" desc="Number of recovered enqueues"/>