summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/asyncStore
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/asyncStore')
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.cpp128
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.h85
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp323
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h145
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp67
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreOptions.h58
-rw-r--r--cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp37
-rw-r--r--cpp/src/qpid/asyncStore/ConfigHandleImpl.h41
-rw-r--r--cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp40
-rw-r--r--cpp/src/qpid/asyncStore/EnqueueHandleImpl.h51
-rw-r--r--cpp/src/qpid/asyncStore/EventHandleImpl.cpp46
-rw-r--r--cpp/src/qpid/asyncStore/EventHandleImpl.h51
-rw-r--r--cpp/src/qpid/asyncStore/MessageHandleImpl.cpp38
-rw-r--r--cpp/src/qpid/asyncStore/MessageHandleImpl.h48
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp65
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.h51
-rw-r--r--cpp/src/qpid/asyncStore/Plugin.cpp79
-rw-r--r--cpp/src/qpid/asyncStore/Plugin.h52
-rw-r--r--cpp/src/qpid/asyncStore/QueueHandleImpl.cpp46
-rw-r--r--cpp/src/qpid/asyncStore/QueueHandleImpl.h52
-rw-r--r--cpp/src/qpid/asyncStore/README34
-rw-r--r--cpp/src/qpid/asyncStore/RunState.cpp148
-rw-r--r--cpp/src/qpid/asyncStore/RunState.h83
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.cpp61
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.h48
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/AioCallback.h63
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.cpp231
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.h258
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h77
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/Configuration.h106
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/DataOpState.cpp108
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/DataOpState.h129
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/DataToken.cpp127
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/DataToken.h176
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.cpp109
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.h130
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.cpp112
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.h230
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.cpp90
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.h219
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/EventHeader.cpp95
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/EventHeader.h201
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/FileHeader.cpp142
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/FileHeader.h214
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.cpp178
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.h269
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/JournalError.cpp268
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/JournalError.h256
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/JournalParameters.cpp72
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/JournalParameters.h92
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/JournalRunState.cpp151
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/JournalRunState.h174
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/RecordHeader.cpp118
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/RecordHeader.h216
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/RecordIdCounter.h40
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/RecordTail.cpp82
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/RecordTail.h148
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/ScopedLock.cpp143
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/ScopedLock.h93
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/State.h161
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/Streamable.cpp57
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/Streamable.h86
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.cpp88
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.h174
64 files changed, 7560 insertions, 0 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
new file mode 100644
index 0000000000..87d3713b96
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncOperation.cpp
+ */
+
+#include "AsyncOperation.h"
+
+#include "qpid/Exception.h"
+#include "qpid/broker/BrokerContext.h" // for delete in d'tor
+
+#include <sstream>
+
+namespace qpid {
+namespace asyncStore {
+
+AsyncOperation::AsyncOperation() :
+ m_op(NONE),
+ m_targetHandle(),
+ m_dataSrc(0),
+ m_txnHandle(0),
+ m_resCb(0),
+ m_brokerCtxt(0)
+{}
+
+AsyncOperation::AsyncOperation(const opCode op,
+ const qpid::broker::IdHandle* th,
+ const qpid::broker::ResultCallback resCb,
+ qpid::broker::BrokerContext* brokerCtxt) :
+ m_op(op),
+ m_targetHandle(th),
+ m_dataSrc(0),
+ m_txnHandle(0),
+ m_resCb(resCb),
+ m_brokerCtxt(brokerCtxt)
+{}
+
+AsyncOperation::AsyncOperation(const opCode op,
+ const qpid::broker::IdHandle* th,
+ const qpid::broker::DataSource* dataSrc,
+ const qpid::broker::ResultCallback resCb,
+ qpid::broker::BrokerContext* brokerCtxt) :
+ m_op(op),
+ m_targetHandle(th),
+ m_dataSrc(dataSrc),
+ m_txnHandle(0),
+ m_resCb(resCb),
+ m_brokerCtxt(brokerCtxt)
+{}
+
+AsyncOperation::AsyncOperation(const opCode op,
+ const qpid::broker::IdHandle* th,
+ const qpid::broker::TxnHandle* txnHandle,
+ const qpid::broker::ResultCallback resCb,
+ qpid::broker::BrokerContext* brokerCtxt) :
+ m_op(op),
+ m_targetHandle(th),
+ m_dataSrc(0),
+ m_txnHandle(txnHandle),
+ m_resCb(resCb),
+ m_brokerCtxt(brokerCtxt)
+{}
+
+AsyncOperation::AsyncOperation(const opCode op,
+ const qpid::broker::IdHandle* th,
+ const qpid::broker::DataSource* dataSrc,
+ const qpid::broker::TxnHandle* txnHandle,
+ const qpid::broker::ResultCallback resCb,
+ qpid::broker::BrokerContext* brokerCtxt) :
+ m_op(op),
+ m_targetHandle(th),
+ m_dataSrc(dataSrc),
+ m_txnHandle(txnHandle),
+ m_resCb(resCb),
+ m_brokerCtxt(brokerCtxt)
+{}
+
+AsyncOperation::~AsyncOperation()
+{}
+
+const char*
+AsyncOperation::getOpStr() const
+{
+ return getOpStr(m_op);
+}
+
+//static
+const char*
+AsyncOperation::getOpStr(const opCode op)
+{
+ switch (op) {
+ case NONE: return "<none>";
+ case TXN_PREPARE: return "TXN_PREPARE";
+ case TXN_COMMIT: return "TXN_COMMIT";
+ case TXN_ABORT: return "TXN_ABORT";
+ case CONFIG_CREATE: return "CONFIG_CREATE";
+ case CONFIG_DESTROY: return "CONFIG_DESTROY";
+ case QUEUE_CREATE: return "QUEUE_CREATE";
+ case QUEUE_FLUSH: return "QUEUE_FLUSH";
+ case QUEUE_DESTROY: return "QUEUE_DESTROY";
+ case EVENT_CREATE: return "EVENT_CREATE";
+ case EVENT_DESTROY: return "EVENT_DESTROY";
+ case MSG_ENQUEUE: return "MSG_ENQUEUE";
+ case MSG_DEQUEUE: return "MSG_DEQUEUE";
+ }
+ std::ostringstream oss;
+ oss << "AsyncStore: AsyncOperation::getOpStr(): Unknown op-code \"" << op << "\"";
+ throw qpid::Exception(oss.str());
+}
+
+}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h
new file mode 100644
index 0000000000..756b393b90
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.h
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncOperation.h
+ */
+
+#ifndef qpid_asyncStore_AsyncOperation_h_
+#define qpid_asyncStore_AsyncOperation_h_
+
+#include "qpid/broker/AsyncStore.h"
+#include "qpid/broker/IdHandle.h"
+
+namespace qpid {
+namespace asyncStore {
+
+class AsyncOperation {
+public:
+ typedef enum {NONE=0,
+ TXN_PREPARE,
+ TXN_COMMIT,
+ TXN_ABORT,
+ CONFIG_CREATE,
+ CONFIG_DESTROY,
+ QUEUE_CREATE,
+ QUEUE_FLUSH,
+ QUEUE_DESTROY,
+ EVENT_CREATE,
+ EVENT_DESTROY,
+ MSG_ENQUEUE,
+ MSG_DEQUEUE
+ } opCode;
+
+ AsyncOperation();
+ AsyncOperation(const opCode op,
+ const qpid::broker::IdHandle* th,
+ const qpid::broker::ResultCallback resCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ AsyncOperation(const opCode op,
+ const qpid::broker::IdHandle* th,
+ const qpid::broker::DataSource* dataSrc,
+ const qpid::broker::ResultCallback resCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ AsyncOperation(const opCode op,
+ const qpid::broker::IdHandle* th,
+ const qpid::broker::TxnHandle* txnHandle,
+ const qpid::broker::ResultCallback resCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ AsyncOperation(const opCode op,
+ const qpid::broker::IdHandle* th,
+ const qpid::broker::DataSource* dataSrc,
+ const qpid::broker::TxnHandle* txnHandle,
+ const qpid::broker::ResultCallback resCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ virtual ~AsyncOperation();
+ const char* getOpStr() const;
+ static const char* getOpStr(const opCode op);
+
+ opCode m_op;
+ const qpid::broker::IdHandle* m_targetHandle;
+ const qpid::broker::DataSource* m_dataSrc;
+ const qpid::broker::TxnHandle* m_txnHandle;
+ const qpid::broker::ResultCallback m_resCb;
+ qpid::broker::BrokerContext* m_brokerCtxt;
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_AsyncOperation_h_
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
new file mode 100644
index 0000000000..3bdb2c3b98
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncStoreImpl.cpp
+ */
+
+#include "AsyncStoreImpl.h"
+
+#include "AsyncOperation.h"
+
+#include "qpid/broker/ConfigHandle.h"
+#include "qpid/broker/EnqueueHandle.h"
+#include "qpid/broker/EventHandle.h"
+#include "qpid/broker/MessageHandle.h"
+#include "qpid/broker/QueueHandle.h"
+#include "qpid/broker/TxnHandle.h"
+
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace asyncStore {
+
+AsyncStoreImpl::AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
+ const AsyncStoreOptions& opts) :
+ m_poller(poller),
+ m_opts(opts),
+ m_runState(),
+ m_operations(m_poller)
+{}
+
+AsyncStoreImpl::~AsyncStoreImpl()
+{}
+
+void
+AsyncStoreImpl::initialize()
+{}
+
+uint64_t
+AsyncStoreImpl::getNextRid()
+{
+ return m_ridCntr.next();
+}
+
+void
+AsyncStoreImpl::initManagement(qpid::broker::Broker* /*broker*/)
+{}
+
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle(const std::string& xid)
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl(xid));
+}
+
+qpid::broker::ConfigHandle
+AsyncStoreImpl::createConfigHandle()
+{
+ return qpid::broker::ConfigHandle(new ConfigHandleImpl());
+}
+
+qpid::broker::QueueHandle
+AsyncStoreImpl::createQueueHandle(const std::string& name,
+ const qpid::types::Variant::Map& opts)
+{
+ return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts));
+}
+
+qpid::broker::EventHandle
+AsyncStoreImpl::createEventHandle(qpid::broker::QueueHandle& queueHandle,
+ const std::string& key)
+{
+ return qpid::broker::EventHandle(new EventHandleImpl(queueHandle, key));
+}
+
+qpid::broker::MessageHandle
+AsyncStoreImpl::createMessageHandle(const qpid::broker::DataSource* dataSrc)
+
+{
+ return qpid::broker::MessageHandle(new MessageHandleImpl(dataSrc));
+}
+
+qpid::broker::EnqueueHandle
+AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle,
+ qpid::broker::QueueHandle& queueHandle)
+{
+ return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle));
+}
+
+void
+AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_PREPARE,
+ dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_COMMIT,
+ dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_ABORT,
+ dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle,
+ const qpid::broker::DataSource* dataSrc,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::CONFIG_CREATE,
+ dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle),
+ dataSrc,
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::CONFIG_DESTROY,
+ dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle),
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle,
+ const qpid::broker::DataSource* dataSrc,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_CREATE,
+ dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
+ dataSrc,
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_DESTROY,
+ dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_FLUSH,
+ dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
+ const qpid::broker::DataSource* dataSrc,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_CREATE,
+ dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ dataSrc,
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
+ const qpid::broker::DataSource* dataSrc,
+ qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_CREATE,
+ dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ dataSrc,
+ &txnHandle,
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY,
+ dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY,
+ dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ &txnHandle,
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE,
+ dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE,
+ dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
+ &txnHandle,
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE,
+ dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt)
+{
+ AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE,
+ dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
+ &txnHandle,
+ resultCb,
+ brokerCtxt);
+ m_operations.submit(op);
+}
+
+int
+AsyncStoreImpl::loadContent(qpid::broker::MessageHandle& /*msgHandle*/,
+ qpid::broker::QueueHandle& /*queueHandle*/,
+ char* /*data*/,
+ uint64_t /*offset*/,
+ const uint64_t /*length*/)
+{
+ return 0;
+}
+
+}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
new file mode 100644
index 0000000000..c33e9030fe
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncStoreImpl.h
+ */
+
+#ifndef qpid_asyncStore_AsyncStoreImpl_h_
+#define qpid_asyncStore_AsyncStoreImpl_h_
+
+#include "AsyncStoreOptions.h"
+#include "RunState.h"
+#include "OperationQueue.h"
+
+#include "qpid/asyncStore/jrnl2/RecordIdCounter.h"
+#include "qpid/broker/AsyncStore.h"
+#include "qpid/sys/Poller.h"
+
+namespace qpid {
+
+namespace broker {
+class Broker;
+} // namespace qpid::broker
+
+namespace asyncStore {
+
+class AsyncStoreImpl: public qpid::broker::AsyncStore {
+public:
+ AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
+ const AsyncStoreOptions& opts);
+ virtual ~AsyncStoreImpl();
+ void initialize();
+ uint64_t getNextRid();
+
+ // Management
+
+ void initManagement(qpid::broker::Broker* broker);
+
+ // AsyncStore interface
+
+ qpid::broker::TxnHandle createTxnHandle(const std::string& xid=std::string());
+ qpid::broker::ConfigHandle createConfigHandle();
+ qpid::broker::QueueHandle createQueueHandle(const std::string& name,
+ const qpid::types::Variant::Map& opts);
+ qpid::broker::EventHandle createEventHandle(qpid::broker::QueueHandle& queueHandle,
+ const std::string& key=std::string());
+ qpid::broker::MessageHandle createMessageHandle(const qpid::broker::DataSource* dataSrc);
+ qpid::broker::EnqueueHandle createEnqueueHandle(qpid::broker::MessageHandle& msgHandle,
+ qpid::broker::QueueHandle& queueHandle);
+
+ void submitPrepare(qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ void submitCommit(qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ void submitAbort(qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+
+ void submitCreate(qpid::broker::ConfigHandle& cfgHandle,
+ const qpid::broker::DataSource* dataSrc,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ void submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+
+ void submitCreate(qpid::broker::QueueHandle& queueHandle,
+ const qpid::broker::DataSource* dataSrc,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ void submitDestroy(qpid::broker::QueueHandle& queueHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ void submitFlush(qpid::broker::QueueHandle& queueHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+
+ void submitCreate(qpid::broker::EventHandle& eventHandle,
+ const qpid::broker::DataSource* dataSrc,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ void submitCreate(qpid::broker::EventHandle& eventHandle,
+ const qpid::broker::DataSource* dataSrc,
+ qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ void submitDestroy(qpid::broker::EventHandle& eventHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ void submitDestroy(qpid::broker::EventHandle& eventHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+
+ void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ void submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+ void submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerContext* brokerCtxt);
+
+ // Legacy - Restore FTD message, is NOT async!
+ virtual int loadContent(qpid::broker::MessageHandle& msgHandle,
+ qpid::broker::QueueHandle& queueHandle,
+ char* data,
+ uint64_t offset,
+ const uint64_t length);
+
+protected:
+ boost::shared_ptr<qpid::sys::Poller> m_poller;
+ AsyncStoreOptions m_opts;
+ RunState m_runState;
+ OperationQueue m_operations;
+ qpid::asyncStore::jrnl2::RecordIdCounter_t m_ridCntr;
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_AsyncStoreImpl_h_
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp b/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp
new file mode 100644
index 0000000000..939d65ddf9
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncStoreOptions.cpp
+ */
+
+#include "AsyncStoreOptions.h"
+
+namespace qpid {
+namespace asyncStore {
+
+AsyncStoreOptions::AsyncStoreOptions(const std::string& name):
+ qpid::Options(name),
+ m_storeDir(getDefaultStoreDir())
+{
+ addOptions()
+ ("store-dir", qpid::optValue(m_storeDir, "DIR"),
+ "Store directory location for persistence (instead of using --data-dir value). "
+ "Required if --no-data-dir is also used.")
+ ;
+}
+
+AsyncStoreOptions::AsyncStoreOptions(const std::string& storeDir,
+ const std::string& name) :
+ qpid::Options(name),
+ m_storeDir(storeDir)
+{}
+
+AsyncStoreOptions::~AsyncStoreOptions()
+{}
+
+void
+AsyncStoreOptions::printVals(std::ostream& os) const
+{
+ os << "ASYNC STORE OPTIONS:" << std::endl;
+ os << " Store directory location for persistence [store-dir]: \"" << m_storeDir << "\"" << std::endl;
+}
+
+void
+AsyncStoreOptions::validate()
+{}
+
+//static
+std::string& AsyncStoreOptions::getDefaultStoreDir()
+{
+ static std::string s_defaultStoreDir = "/tmp";
+ return s_defaultStoreDir;
+}
+
+}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreOptions.h b/cpp/src/qpid/asyncStore/AsyncStoreOptions.h
new file mode 100644
index 0000000000..bc1b6a2f9f
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/AsyncStoreOptions.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncStoreOptions.h
+ */
+
+#ifndef qpid_asyncStore_AsyncStoreOptions_h_
+#define qpid_asyncStore_AsyncStoreOptions_h_
+
+#include "qpid/asyncStore/jrnl2/Streamable.h"
+
+#include "qpid/Options.h"
+
+#include <string>
+
+namespace qpid {
+namespace broker {
+class Options;
+}
+namespace asyncStore {
+
+class AsyncStoreOptions : public qpid::Options
+{
+public:
+ AsyncStoreOptions(const std::string& name="Async Store Options");
+ AsyncStoreOptions(const std::string& storeDir,
+ const std::string& name="Async Store Options");
+ virtual ~AsyncStoreOptions();
+ void printVals(std::ostream& os) const;
+ void validate();
+
+ std::string m_storeDir;
+
+protected:
+ // Static initialization race condition avoidance with static instance of Plugin class (using construct-on-first-use idiom).
+ static std::string& getDefaultStoreDir();
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_AsyncStoreOptions_h_
diff --git a/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp b/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp
new file mode 100644
index 0000000000..64e2e848fa
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ConfigHandleImpl.cpp
+ */
+
+#include "ConfigHandleImpl.h"
+
+#include "qpid/messaging/PrivateImplRef.h"
+
+namespace qpid {
+namespace asyncStore {
+
+ConfigHandleImpl::ConfigHandleImpl()
+{}
+
+ConfigHandleImpl::~ConfigHandleImpl()
+{}
+
+}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/ConfigHandleImpl.h b/cpp/src/qpid/asyncStore/ConfigHandleImpl.h
new file mode 100644
index 0000000000..17069ec21c
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/ConfigHandleImpl.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ConfigHandleImpl.h
+ */
+
+#ifndef qpid_asyncStore_ConfigHandleImpl_h_
+#define qpid_asyncStore_ConfigHandleImpl_h_
+
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace asyncStore {
+
+class ConfigHandleImpl : public virtual qpid::RefCounted
+{
+public:
+ ConfigHandleImpl();
+ virtual ~ConfigHandleImpl();
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_ConfigHandleImpl_h_
diff --git a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp
new file mode 100644
index 0000000000..0e291def78
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file EnqueueHandleImpl.cpp
+ */
+
+#include "EnqueueHandleImpl.h"
+
+#include "qpid/messaging/PrivateImplRef.h"
+
+namespace qpid {
+namespace asyncStore {
+
+EnqueueHandleImpl::EnqueueHandleImpl(qpid::broker::MessageHandle& msgHandle,
+ qpid::broker::QueueHandle& queueHandle) :
+ m_msgHandle(msgHandle),
+ m_queueHandle(queueHandle)
+{}
+
+EnqueueHandleImpl::~EnqueueHandleImpl()
+{}
+
+}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h
new file mode 100644
index 0000000000..82f3e5c47e
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file EnqueueHandleImpl.h
+ */
+
+#ifndef qpid_asyncStore_EnqueueHandleImpl_h_
+#define qpid_asyncStore_EnqueueHandleImpl_h_
+
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+
+namespace broker {
+class MessageHandle;
+class QueueHandle;
+}
+
+namespace asyncStore {
+
+class EnqueueHandleImpl : public virtual qpid::RefCounted
+{
+public:
+ EnqueueHandleImpl(qpid::broker::MessageHandle& msgHandle,
+ qpid::broker::QueueHandle& queueHandle);
+ virtual ~EnqueueHandleImpl();
+protected:
+ qpid::broker::MessageHandle& m_msgHandle;
+ qpid::broker::QueueHandle& m_queueHandle;
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_EnqueueHandleImpl_h_
diff --git a/cpp/src/qpid/asyncStore/EventHandleImpl.cpp b/cpp/src/qpid/asyncStore/EventHandleImpl.cpp
new file mode 100644
index 0000000000..05b98d1e5d
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/EventHandleImpl.cpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file EventHandleImpl.cpp
+ */
+
+#include "EventHandleImpl.h"
+
+#include "qpid/messaging/PrivateImplRef.h"
+
+namespace qpid {
+namespace asyncStore {
+
+EventHandleImpl::EventHandleImpl(qpid::broker::QueueHandle& queueHandle,
+ const std::string& key) :
+ m_queueHandle(queueHandle),
+ m_key(key)
+{}
+
+EventHandleImpl::~EventHandleImpl()
+{}
+
+const std::string&
+EventHandleImpl::getKey() const
+{
+ return m_key;
+}
+
+}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/EventHandleImpl.h b/cpp/src/qpid/asyncStore/EventHandleImpl.h
new file mode 100644
index 0000000000..9ed841b8c0
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/EventHandleImpl.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file EventHandleImpl.h
+ */
+
+#ifndef qpid_asyncStore_EventHandleImpl_h_
+#define qpid_asyncStore_EventHandleImpl_h_
+
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+
+namespace broker {
+class QueueHandle;
+}
+
+namespace asyncStore {
+
+class EventHandleImpl : public virtual qpid::RefCounted
+{
+public:
+ EventHandleImpl(qpid::broker::QueueHandle& queueHandle,
+ const std::string& key = std::string());
+ virtual ~EventHandleImpl();
+ const std::string& getKey() const;
+protected:
+ qpid::broker::QueueHandle& m_queueHandle;
+ std::string m_key;
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_EventHandleImpl_h_
diff --git a/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp b/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp
new file mode 100644
index 0000000000..cea039221a
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MessageHandleImpl.cpp
+ */
+
+#include "MessageHandleImpl.h"
+
+#include "qpid/messaging/PrivateImplRef.h"
+
+namespace qpid {
+namespace asyncStore {
+
+MessageHandleImpl::MessageHandleImpl(const qpid::broker::DataSource* dataSrc) :
+ m_dataSrc(dataSrc)
+{}
+
+MessageHandleImpl::~MessageHandleImpl()
+{}
+
+}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/MessageHandleImpl.h b/cpp/src/qpid/asyncStore/MessageHandleImpl.h
new file mode 100644
index 0000000000..0907ac5e65
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/MessageHandleImpl.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MessageHandleImpl.h
+ */
+
+#ifndef qpid_asyncStore_MessageHandleImpl_h_
+#define qpid_asyncStore_MessageHandleImpl_h_
+
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+
+namespace broker {
+class DataSource;
+}
+
+namespace asyncStore {
+
+class MessageHandleImpl : public virtual qpid::RefCounted
+{
+public:
+ MessageHandleImpl(const qpid::broker::DataSource* dataSrc);
+ virtual ~MessageHandleImpl();
+protected:
+ const qpid::broker::DataSource* m_dataSrc;
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_MessageHandleImpl_h_
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
new file mode 100644
index 0000000000..298d6d3061
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file OperationQueue.cpp
+ */
+
+#include "OperationQueue.h"
+
+#include "qpid/broker/BrokerContext.h"
+
+namespace qpid {
+namespace asyncStore {
+
+OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) :
+ m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller)
+{
+ m_opQueue.start();
+}
+
+OperationQueue::~OperationQueue()
+{
+ m_opQueue.stop();
+}
+
+void
+OperationQueue::submit(const AsyncOperation* op)
+{
+//std::cout << "***** OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
+ m_opQueue.push(op);
+}
+
+// protected
+OperationQueue::OpQueue::Batch::const_iterator
+OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
+{
+ for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
+//std::cout << "##### OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
+ if ((*i)->m_resCb) {
+ ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt);
+ } else {
+ delete (*i)->m_brokerCtxt;
+ }
+ delete (*i);
+ }
+ return e.end();
+}
+
+}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h
new file mode 100644
index 0000000000..8a79684262
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/OperationQueue.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file OperationQueue.h
+ */
+
+#ifndef qpid_asyncStore_OperationQueue_h_
+#define qpid_asyncStore_OperationQueue_h_
+
+#include "AsyncOperation.h"
+
+#include "qpid/broker/AsyncStore.h"
+#include "qpid/sys/PollableQueue.h"
+
+namespace qpid {
+namespace asyncStore {
+
+class OperationQueue
+{
+public:
+ OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller);
+ virtual ~OperationQueue();
+ void submit(const AsyncOperation* op);
+
+protected:
+ typedef qpid::sys::PollableQueue<const AsyncOperation*> OpQueue;
+ OpQueue m_opQueue;
+
+ OpQueue::Batch::const_iterator handle(const OpQueue::Batch& e);
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_OperationQueue_h_
diff --git a/cpp/src/qpid/asyncStore/Plugin.cpp b/cpp/src/qpid/asyncStore/Plugin.cpp
new file mode 100644
index 0000000000..4f35e8cd2a
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/Plugin.cpp
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Plugin.cpp
+ */
+
+#include "Plugin.h"
+
+#include "qpid/broker/Broker.h"
+
+namespace qpid {
+namespace broker {
+
+void
+Plugin::earlyInitialize(Target& target)
+{
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ if (!broker) return;
+ DataDir& dataDir = broker->getDataDir ();
+ if (m_options.m_storeDir.empty ())
+ {
+ if (!dataDir.isEnabled ())
+ throw Exception ("asyncStore: If --data-dir is blank or --no-data-dir is specified, --store-dir must be present.");
+
+ m_options.m_storeDir = dataDir.getPath ();
+ }
+ m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options));
+ boost::shared_ptr<qpid::broker::AsyncStore> brokerAsyncStore(m_store);
+ broker->setAsyncStore(brokerAsyncStore);
+ boost::function<void()> fn = boost::bind(&Plugin::finalize, this);
+ target.addFinalizer(fn);
+ QPID_LOG(info, "asyncStore: Initialized using path " << m_options.m_storeDir);
+}
+
+void
+Plugin::initialize(Target& target)
+{
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ if (!broker || !m_store) return;
+
+ // Not done in earlyInitialize as the Broker::isInCluster test won't work there.
+ if (broker->isInCluster()) {
+ QPID_LOG(info, "asyncStore: Part of cluster: Disabling management instrumentation");
+ } else {
+ QPID_LOG(info, "asyncStore: Enabling management instrumentation");
+ m_store->initManagement(broker);
+ }
+}
+
+void
+Plugin::finalize()
+{
+ m_store.reset();
+}
+
+qpid::Options*
+Plugin::getOptions()
+{
+ return &m_options;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/asyncStore/Plugin.h b/cpp/src/qpid/asyncStore/Plugin.h
new file mode 100644
index 0000000000..cbb0bfdadb
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/Plugin.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Plugin.h
+ */
+
+#ifndef qpid_broker_Plugin_h_
+#define qpid_broker_Plugin_h_
+
+#include "AsyncStoreImpl.h"
+#include "AsyncStoreOptions.h"
+
+#include "qpid/Plugin.h"
+
+namespace qpid {
+class Options;
+namespace broker {
+
+class Plugin : public qpid::Plugin
+{
+public:
+ virtual void earlyInitialize(Target& target);
+ virtual void initialize(Target& target);
+ void finalize();
+ virtual qpid::Options* getOptions();
+protected:
+ boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> m_store;
+ qpid::asyncStore::AsyncStoreOptions m_options;
+};
+
+static Plugin instance; // Static initialization.
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_Plugin_h_
diff --git a/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp b/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp
new file mode 100644
index 0000000000..523a31ba7b
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file QueueHandleImpl.cpp
+ */
+
+#include "QueueHandleImpl.h"
+
+#include "qpid/messaging/PrivateImplRef.h"
+
+namespace qpid {
+namespace asyncStore {
+
+QueueHandleImpl::QueueHandleImpl(const std::string& name,
+ const qpid::types::Variant::Map& opts) :
+ m_name(name),
+ m_opts(opts)
+{}
+
+QueueHandleImpl::~QueueHandleImpl()
+{}
+
+const std::string&
+QueueHandleImpl::getName() const
+{
+ return m_name;
+}
+
+}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/QueueHandleImpl.h b/cpp/src/qpid/asyncStore/QueueHandleImpl.h
new file mode 100644
index 0000000000..9046a33877
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/QueueHandleImpl.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file QueueHandleImpl.h
+ */
+
+#ifndef qpid_asyncStore_QueueHandleImpl_h_
+#define qpid_asyncStore_QueueHandleImpl_h_
+
+#include "qpid/RefCounted.h"
+#include "qpid/types/Variant.h"
+
+#include <string>
+
+namespace qpid {
+namespace asyncStore {
+
+class QueueHandleImpl : public virtual qpid::RefCounted
+{
+public:
+ QueueHandleImpl(const std::string& name,
+ const qpid::types::Variant::Map& opts);
+ virtual ~QueueHandleImpl();
+
+ const std::string& getName() const;
+
+protected:
+ const std::string m_name;
+ const qpid::types::Variant::Map& m_opts;
+
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_QueueHandleImpl_h_
diff --git a/cpp/src/qpid/asyncStore/README b/cpp/src/qpid/asyncStore/README
new file mode 100644
index 0000000000..4590bfea62
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/README
@@ -0,0 +1,34 @@
+This folder contains source code for the new async store module. It implements the interface
+described in qpid/broker/AsyncStore.h.
+
+NOTE: This is NOT the current (old) async store module implemented through the sync interface
+in qpid/broker/MessageStoere.h. That store will continue to be kept separately at
+https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp (or if no checkin credentials, then
+read-only at http://anonsvn.jboss.org/repos/rhmessaging/store/trunk/cpp).
+
+Layout
+------
+
+qpid
+ |
+ +---asyncStore (Interface layer with broker)
+ |
+ +---jrnl2 (Async journal)
+
+
+The jrnl2 directory contains the asynchronous journal code, and contains all the journal
+file management and low-level writing and reading. This code knows only about writing
+opaque data blobs to disk in records which may or may not contain a transaction identifier.
+Records may be enqueued (written) or dequeued (which writes to disk the logical deletion of an
+earlier enqueued record).
+
+The upper asyncStore directory contains the implementation of the broker's AsyncStore
+interface and the necessary logic to interface this to the jrnl2 code. It also provides
+transaction handling and recovery logic.
+
+Building
+--------
+Currently, this module is a Linux-only module, owing to the dependency of the jrnl2 layer
+on the Linux-only libaio library.
+
+Only cmake is supported.
diff --git a/cpp/src/qpid/asyncStore/RunState.cpp b/cpp/src/qpid/asyncStore/RunState.cpp
new file mode 100644
index 0000000000..3605a0c2e1
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/RunState.cpp
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RunState.cpp
+ */
+
+#include "RunState.h"
+
+#include "qpid/Exception.h"
+
+#include <sstream>
+
+namespace qpid {
+namespace asyncStore {
+
+RunState::RunState() :
+ qpid::asyncStore::jrnl2::State<RunState_t>()
+{}
+
+RunState::RunState(const RunState_t s) :
+ qpid::asyncStore::jrnl2::State<RunState_t>(s)
+{}
+
+RunState::RunState(const RunState& s) :
+ qpid::asyncStore::jrnl2::State<RunState_t>(s)
+{}
+
+RunState::~RunState()
+{}
+
+void
+RunState::setInitializing()
+{
+ set(RS_INITIALIZING);
+}
+
+void
+RunState::setRestoring()
+{
+ set(RS_RESTORING);
+}
+
+void
+RunState::setRunning()
+{
+ set(RS_RUNNING);
+}
+
+void
+RunState::setStopping()
+{
+ set(RS_STOPPING);
+}
+
+void
+RunState::setStopped()
+{
+ set(RS_STOPPED);
+}
+
+const char*
+RunState::getAsStr() const
+{
+ return s_toStr(m_state);
+}
+
+//static
+const char*
+RunState::s_toStr(const RunState_t s)
+{
+ switch (s) {
+ case RS_NONE:
+ return "WR_NONE";
+ case RS_INITIALIZING:
+ return "RS_INITIALIZING";
+ case RS_RESTORING:
+ return "RS_RESTORING";
+ case RS_RUNNING:
+ return "RS_RUNNING";
+ case RS_STOPPING:
+ return "RS_STOPPING";
+ case RS_STOPPED:
+ return "RS_STOPPED";
+ default:
+ std::ostringstream oss;
+ oss << "<unknown state (" << "s" << ")>";
+ return oss.str().c_str();
+ }
+}
+
+void
+RunState::set(const RunState_t s)
+{
+ // State transition logic: set stateError to true if an invalid transition is attempted
+ bool stateTransitionError = false;
+ switch (m_state) {
+ case RS_NONE:
+ if (s != RS_INITIALIZING && s != RS_RESTORING) {
+ stateTransitionError = true;
+ }
+ break;
+ case RS_INITIALIZING:
+ case RS_RESTORING:
+ if (s != RS_RUNNING) {
+ stateTransitionError = true;
+ }
+ break;
+ case RS_RUNNING:
+ if (s != RS_STOPPING) {
+ stateTransitionError = true;
+ }
+ break;
+ case RS_STOPPING:
+ if (s != RS_STOPPED) {
+ stateTransitionError = true;
+ }
+ break;
+ case RS_STOPPED: // Cannot move out of this state except with reset()
+ stateTransitionError = true;
+ break;
+ }
+
+ if (stateTransitionError) {
+ std::ostringstream oss;
+ oss << "RunState::set() is in state " << m_state << " and cannot be moved to state " << s << ".";
+ throw qpid::Exception(oss.str());
+ }
+ m_state = s;
+}
+
+}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/RunState.h b/cpp/src/qpid/asyncStore/RunState.h
new file mode 100644
index 0000000000..959e47449d
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/RunState.h
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RunState.h
+ */
+
+#ifndef qpid_asyncStore_RunState_h_
+#define qpid_asyncStore_RunState_h_
+
+#include "qpid/asyncStore/jrnl2/State.h"
+
+namespace qpid {
+namespace asyncStore {
+
+/**
+ * RS_NONE
+ * / \
+ * setInitializing() / \ setRestoring()
+ * / \
+ * V V
+ * RS_INITIALIZING RS_RESTORING
+ * \ /
+ * setRunning() \ / setRunning()
+ * \ /
+ * V V
+ * RS_RUNNING
+ * |
+ * | setStopping()
+ * V
+ * RS_STOPPING
+ * |
+ * | setStopped()
+ * V
+ * RS_STOPPED
+ */
+typedef enum {
+ RS_NONE = 0,
+ RS_INITIALIZING,
+ RS_RESTORING,
+ RS_RUNNING,
+ RS_STOPPING,
+ RS_STOPPED
+} RunState_t;
+
+class RunState: public qpid::asyncStore::jrnl2::State<RunState_t>
+{
+public:
+ RunState();
+ RunState(const RunState_t s);
+ RunState(const RunState& s);
+ virtual ~RunState();
+ void setInitializing();
+ void setRestoring();
+ void setRunning();
+ void setStopping();
+ void setStopped();
+ virtual const char* getAsStr() const;
+ static const char* s_toStr(const RunState_t s);
+protected:
+ virtual void set(const RunState_t s);
+
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_RunState_h_
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
new file mode 100644
index 0000000000..7ce01f881c
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TxnHandleImpl.cpp
+ */
+
+#include "TxnHandleImpl.h"
+
+#include "qpid/messaging/PrivateImplRef.h"
+
+#include <uuid/uuid.h>
+
+namespace qpid {
+namespace asyncStore {
+
+TxnHandleImpl::TxnHandleImpl(const std::string& xid) :
+ m_xid(xid),
+ m_tpcFlag(!xid.empty())
+{
+ if (m_xid.empty()) { // create a local xid from a random uuid
+ uuid_t uuid;
+ ::uuid_generate_random(uuid);
+ char uuidStr[37]; // 36-char uuid + trailing '\0'
+ ::uuid_unparse(uuid, uuidStr);
+// m_xid.assign(uuidStr);
+ }
+}
+
+TxnHandleImpl::~TxnHandleImpl()
+{}
+
+const std::string&
+TxnHandleImpl::getXid() const
+{
+ return m_xid;
+}
+
+bool
+TxnHandleImpl::is2pc() const
+{
+ return m_tpcFlag;
+}
+
+}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
new file mode 100644
index 0000000000..d001fd1916
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TxnHandleImpl.h
+ */
+
+#ifndef qpid_asyncStore_TxnHandleImpl_h_
+#define qpid_asyncStore_TxnHandleImpl_h_
+
+#include "qpid/RefCounted.h"
+
+#include <string>
+
+namespace qpid {
+namespace asyncStore {
+
+class TxnHandleImpl : public virtual qpid::RefCounted
+{
+public:
+ TxnHandleImpl(const std::string& xid = std::string());
+ virtual ~TxnHandleImpl();
+ const std::string& getXid() const;
+ bool is2pc() const;
+protected:
+ std::string m_xid;
+ bool m_tpcFlag;
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_TxnHandleImpl_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/AioCallback.h b/cpp/src/qpid/asyncStore/jrnl2/AioCallback.h
new file mode 100644
index 0000000000..952448fd72
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/AioCallback.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ /**
+ * \file AioCallback.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_AioCallback_h_
+#define qpid_asyncStore_jrnl2_AioCallback_h_
+
+#include <stdint.h> // uint16_t
+#include <vector>
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+class DataToken;
+
+/**
+ * \brief This pure virtual class provides an interface through which asyncronous operation completion calls may
+ * be made.
+ */
+class AioCallback
+{
+public:
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~AioCallback() {}
+
+ /**
+ * \brief Callback function through which asynchronous IO write operation completion notifications are sent.
+ */
+ virtual void writeAioCompleteCallback(std::vector<DataToken*>& dataTokenList) = 0;
+
+ /**
+ * \brief Callback function through which asynchronous IO read operation completion notifications are sent.
+ */
+ virtual void readAioCompleteCallback(std::vector<uint16_t>& buffPageCtrlBlkIndexList) = 0;
+
+};
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_AioCallback_h_
+
diff --git a/cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.cpp b/cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.cpp
new file mode 100644
index 0000000000..979b91579f
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.cpp
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncJournal.cpp
+ */
+
+#include "AsyncJournal.h"
+
+#include "AioCallback.h"
+#include "DataToken.h"
+
+// --- temp code ---
+#include <fstream>
+#include <iostream>
+// --- end temp code ---
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+std::string
+g_ioResAsString(const jrnlOpRes /*res*/)
+{
+ /// \todo TODO - provide implementation
+ return ".[g_ioResAsString].";
+}
+
+AsyncJournal::AsyncJournal(const std::string& jrnlId,
+ const std::string& jrnlDir,
+ const std::string& baseFileName) :
+ m_jrnlId(jrnlId),
+ m_jrnlDir(jrnlDir),
+ m_baseFileName(baseFileName),
+ m_jrnlParamsPtr(0),
+ m_aioCallbackPtr(0)
+{
+ m_writeDataTokens.reserve(s_listSizeThreshold);
+ m_callbackDataTokens.reserve(s_listSizeThreshold);
+}
+
+std::string
+AsyncJournal::getId() const
+{
+ return m_jrnlId;
+}
+
+JournalDirectory
+AsyncJournal::getJournalDir() const
+{
+ return m_jrnlDir;
+}
+
+std::string
+AsyncJournal::getJournalDirName() const
+{
+ return m_jrnlDir.getFqName();
+}
+
+std::string
+AsyncJournal::getBaseFileName() const
+{
+ return m_baseFileName;
+}
+
+const JournalRunState&
+AsyncJournal::getState() const
+{
+ return m_jrnlState;
+}
+
+const JournalParameters*
+AsyncJournal::getParameters() const
+{
+ return m_jrnlParamsPtr;
+}
+
+void
+AsyncJournal::initialize(const JournalParameters* jpPtr,
+ AioCallback* const aiocbPtr)
+{
+ m_jrnlParamsPtr = jpPtr;
+ m_aioCallbackPtr = aiocbPtr;
+ // --- temp code ---
+ m_jrnlDir.create();
+ /// --- end temp code ---
+}
+
+jrnlOpRes
+AsyncJournal::enqueue(DataToken* dtokPtr,
+ const void* /*dataPtr*/,
+ const std::size_t /*dataLen*/,
+ const void* /*tidPtr*/,
+ const std::size_t /*tidLen*/,
+ const bool /*transientFlag*/)
+{
+ dtokPtr->getDataOpState().enqueue();
+ // --- temp code ---
+ { // --- START OF CRITICAL SECTION ---
+ ScopedLock l(m_writeDataTokensLock);
+ m_writeDataTokens.push_back(dtokPtr);
+ if (m_writeDataTokens.size() >= s_listSizeThreshold) {
+ flushNoLock();
+ }
+ } // --- END OF CRITICAL SECTION ---
+ //processCompletedAioWriteEvents(0);
+ // --- end temp code ---
+ return 0;
+}
+
+jrnlOpRes
+AsyncJournal::dequeue(DataToken* const dtokPtr,
+ const void* /*tidPtr*/,
+ const std::size_t /*tidLen*/)
+{
+ dtokPtr->getDataOpState().dequeue();
+ dtokPtr->setDequeueRecordId(dtokPtr->getRecordId());
+ // --- temp code ---
+ { // --- START OF CRITICAL SECTION ---
+ ScopedLock l(m_writeDataTokensLock);
+ m_writeDataTokens.push_back(dtokPtr);
+ if (m_writeDataTokens.size() >= s_listSizeThreshold) {
+ flushNoLock();
+ }
+ } // --- END OF CRITICAL SECTION ---
+ //processCompletedAioWriteEvents(0);
+ // --- end temp code ---
+ return 0;
+}
+
+jrnlOpRes
+AsyncJournal::commit()
+{
+ /// \todo TODO - provide implementation
+ return 0;
+}
+
+jrnlOpRes
+AsyncJournal::abort()
+{
+ /// \todo TODO - provide implementation
+ return 0;
+}
+
+jrnlOpRes
+AsyncJournal::flush()
+{
+ // --- temp code ---
+ // --- START OF CRITICAL SECTION ---
+ ScopedTryLock l(m_writeDataTokensLock);
+ if (l.isLocked()) {
+ return flushNoLock();
+ }
+ return 0;
+ // --- END OF CRITICAL SECTION ---
+ // --- end temp code ---
+}
+
+// protected
+jrnlOpRes
+AsyncJournal::flushNoLock()
+{
+ // --- temp code ---
+ // Normally the page would be written to disk using libaio here (still to do).
+ uint32_t cnt = 0UL;
+ ScopedLock l(m_callbackDataTokensLock);
+ while (!m_writeDataTokens.empty()) {
+ m_callbackDataTokens.push_back(m_writeDataTokens.back());
+ m_writeDataTokens.pop_back();
+ ++cnt;
+ }
+ return 0;
+ // --- end temp code ---
+}
+
+jrnlOpRes
+AsyncJournal::sync(const double timeout)
+{
+ // --- temp code ---
+ // --- START OF CRITICAL SECTION ---
+ ScopedTryLock l(m_writeDataTokensLock);
+ if (l.isLocked()) {
+ return syncNoLock(timeout);
+ }
+ return 0;
+ // --- END OF CRITICAL SECTION ---
+ // --- end temp code ---
+}
+
+// protected
+jrnlOpRes
+AsyncJournal::syncNoLock(const double /*timeout*/)
+{
+ // --- temp code ---
+ if (m_callbackDataTokens.size()) {
+ processCompletedAioWriteEvents();
+ }
+ return 0;
+ // --- end temp code ---
+}
+
+void
+AsyncJournal::processCompletedAioWriteEvents(const double /*timeout*/)
+{
+ // --- temp code ---
+ // --- START OF CRITICAL SECTION 1 ---
+ ScopedLock l1(m_callbackDataTokensLock);
+ m_aioCallbackPtr->writeAioCompleteCallback(m_callbackDataTokens);
+ // --- END OF CRITICAL SECTION 1 ---
+ // --- end temp code ---
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+
diff --git a/cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.h b/cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.h
new file mode 100644
index 0000000000..845d131a49
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.h
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncJournal.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_AsyncJournal_h_
+#define qpid_asyncStore_jrnl2_AsyncJournal_h_
+
+#include "JournalDirectory.h"
+#include "JournalRunState.h"
+#include "ScopedLock.h"
+
+#include <string>
+#include <stdint.h> // uint64_t, uint32_t, etc.
+
+// --- temp code ---
+#include <vector>
+// --- end temp code ---
+
+namespace qpid { ///< Namespace for top-level qpid domain
+namespace asyncStore { ///< Namespace for AsyncStore code
+namespace jrnl2 { ///< Namespace for AsyncStore journal v.2 code.
+
+class AioCallback;
+class DataToken;
+class JournalParameters;
+
+/**
+ * \brief Type to return results from journal operations.
+ * \todo TODO - decide if this is the right place to expose these codes and flags. Also express ioRes as flags.
+ */
+typedef uint64_t jrnlOpRes;
+
+const jrnlOpRes RHM_IORES_ENQCAPTHRESH = 0x1; ///< Error flag indicating an enqueue capacity threshold was reached.
+const jrnlOpRes RHM_IORES_BUSY = 0x2; ///< Error flag indicating that a call could not be completed because the Store is busy.
+
+/**
+ * \brief Global function to convert a return code into a textual representation.
+ */
+std::string g_ioResAsString(const jrnlOpRes /*res*/);
+
+/**
+ * \brief Top level of a single journal instance, which is usually deployed on a per-queue basis.
+ *
+ * Each journal has its own set of journal files and when recovered will result in restored data (messages) being
+ * sent to the queue to which this instance is connected.
+ */
+class AsyncJournal
+{
+public:
+ /**
+ * \brief Constructor, which creates a single instance of a journal.
+ *
+ * \param jrnlId Journal identifier (JID), typically the name of the queue with which this journal
+ * is associated.
+ * \param jrnlDir Absolute path to the directory in which the journal will be placed. If the path does not exist,
+ * it will be created.
+ * \param baseFileName The base name of all files in the journal directory.
+ */
+ AsyncJournal(const std::string& jrnlId,
+ const std::string& jrnlDir,
+ const std::string& baseFileName);
+
+ // Get functions
+
+ /**
+ * \brief Get the journal identifier (JID).
+ *
+ * \returns Journal identifier (JID) of this journal instance.
+ */
+ std::string getId() const;
+
+ /**
+ * \brief Get the URI or directory where this journal is deployed (writes its files).
+ *
+ * \returns JournalDirectory instance controlling where this journal instance is deployed.
+ */
+ JournalDirectory getJournalDir() const;
+
+ /**
+ * \brief Get the URI or directory where this journal is deployed (writes its files).
+ *
+ * \returns URI or directory (as a std::string) where this journal instance is deployed.
+ */
+ std::string getJournalDirName() const;
+
+ /**
+ * \brief Get the base file name used for all journal and metadata files associated with this journal instance.
+ *
+ * \returns String containing the base file name used for all files associated with this journal instance.
+ */
+ std::string getBaseFileName() const;
+
+ /**
+ * \brief Get the journal state object, which can be queried for the journal state.
+ *
+ * \returns \c const reference to the JournalSate instance associated with this journal instance.
+ */
+ const JournalRunState& getState() const;
+
+ /**
+ * \brief Return the Journal parameter object which contains the journal options and settings. It may be
+ * queried directly to obtain the options and settings.
+ *
+ * \returns Pointer to the JournalParameters instance associated with this journal instance.
+ */
+ const JournalParameters* getParameters() const;
+
+ // Data ops
+
+ /**
+ * \brief Initialize the journal and its files, making it ready for use.
+ *
+ * \param jpPtr Pointer to an instance of JournalParameters containing the journal options and settings.
+ * \param aiocbPtr Pointer to an instance of AioCallback, whcih sets the broker AIO callback functions.
+ *
+ * \todo TODO: Make this call async for large/slow ops
+ */
+ void initialize(const JournalParameters* jpPtr,
+ AioCallback* const aiocbPtr);
+
+ /**
+ * \brief Enqueue data of size dataLen and pointed to by dataPtr. If transactional, then tidPtr points to
+ * the transaction id and tidLen indicates its size. The DataToken instance must be kept in the service of this
+ * data record until it has been fully dequeued.
+ *
+ * \param dtokPtr Pointer to the DataToken object of a previouisly enqueued data record.
+ * \param dataPtr Pointer to data to be stored (enqueued). If \b NULL, when parameter \a dataLen > 0, then this
+ * is assumed to be an externally stored data record.
+ * \param dataLen Length of the data pointed to
+ * \param tidPtr Pointer to the transaction ID for this operation. If \b NULL together with \a tidLen, then
+ * the operation is non-transactional.
+ * \param tidLen Size of the transaction ID pointed to in parameter \a tidPtr. Must be 0 when \a tidPtr is
+ * \b NULL to make non-transactional.
+ * \param transientFlag If \b true, sets a flag indicating that on recover, this record is to be ignored and
+ * treated as if transient (rather than durable).
+ *
+ * \returns Return code for this operation. A zero value (0x0) indicates success, a non-zero value indicates
+ * an error has occurred or an issue is present.
+ */
+ jrnlOpRes enqueue(DataToken* dtokPtr,
+ const void* dataPtr, // if null and dataLen > 0, extern assumed
+ const std::size_t dataLen,
+ const void* tidPtr, // if null and tidLen == 0, non transactional
+ const std::size_t tidLen,
+ const bool transientFlag);
+
+ /**
+ * \brief Dequeue the data record previously enqueued using the DataToken object dtokPtr.
+ *
+ * \param dtokPtr Pointer to the DataToken object of a previouisly enqueued data record.
+ * \param tidPtr Pointer to the transaction ID for this operation. If \b NULL together with \a tidLen, then
+ * the operation is non-transactional.
+ * \param tidLen Size of the transaction ID pointed to in parameter \a tidPtr. Must be 0 when \a tidPtr is
+ * \b NULL to make non-transactional.
+ * \returns Return code for this operation. A zero value (0x0) indicates success, a non-zero value indicates
+ * an error has occurred or an issue is present.
+ */
+ jrnlOpRes dequeue(DataToken* const dtokPtr,
+ const void* tidPtr, // if null and tidLen == 0, non transactional
+ const std::size_t tidLen);
+
+ /**
+ * \brief Commit the transaction Id (XID) used for previoius enqueue(s) and/or dequeue(s).
+ *
+ * \return Return code for this operation. A zero value (0x0) indicates success, a non-zero value indicates
+ * an error has occurred or an issue is present.
+ *
+ * \todo TODO: Create and add an XID type as a parameter to this call.
+ */
+ jrnlOpRes commit();
+
+ /**
+ * \brief Abort (roll back) the transaction Id (XID) used for previoius enqueue(s) and/or dequeue(s).
+ *
+ * \return Return code for this operation. A zero value (0x0) indicates success, a non-zero value indicates
+ * an error has occurred or an issue is present.
+ *
+ * \todo TODO: Create and add an XID type as a parameter to this call.
+ */
+ jrnlOpRes abort();
+
+ // AIO ops and status
+
+ /**
+ * \brief Flush all unwritten buffered records for this journal.
+ */
+ jrnlOpRes flush();
+
+ /**
+ * \brief Wait until all AIOs outstanding at the last flush() have returned for this journal.
+ *
+ * It is assumed that flush() will have been previously called. This call by definition blocks until \a timeout
+ * has elapsed or all AIO operations outstanding at the last flush() call have returned. A zero timeout implies
+ * an indefinite block.
+ */
+ jrnlOpRes sync(const double timeout = 0.0);
+
+ /**
+ * \brief Search for and process completed AIO write events.
+ *
+ * \param timeout Maximum time to wait for completion, otherwise return without performing any work.
+ *
+ * \todo TODO: This may become obsolete if epoll is used instead.
+ * \todo TODO: Should this return the number of completed AIO events?
+ */
+ void processCompletedAioWriteEvents(const double timeout = 0.0);
+
+protected:
+ std::string m_jrnlId; ///< Identifier for this journal instance (JID), typically queue name.
+ JournalDirectory m_jrnlDir; ///< Directory in which this journal is deployed.
+ std::string m_baseFileName; ///< Base file name used for all journal files belonging to this instance.
+ JournalRunState m_jrnlState; ///< Journal state manager, controls the state of this journal.
+ const JournalParameters* m_jrnlParamsPtr; ///< Journal options and parameters associated with this journal.
+ AioCallback* m_aioCallbackPtr; ///< Pointers to the broker's callback functions for AIO completion callbacks.
+
+ // --- temp code ---
+ static const uint32_t s_listSizeThreshold = 250; ///< [TEMP CODE] Number of data records at which a flush will occur.
+ std::vector<DataToken*> m_writeDataTokens; ///< [TEMP CODE] List of data tokens held before a flush.
+ std::vector<DataToken*> m_callbackDataTokens; ///< [TEMP CODE] List of data tokens ready for callbacks.
+ ScopedMutex m_writeDataTokensLock; ///< [TEMP CODE] Lock to protect the write token list.
+ ScopedMutex m_callbackDataTokensLock; ///< [TEMP CODE] Lock to protect the callback token list.
+ // --- end temp code ---
+
+ /**
+ * \brief Internal-use flush call which operates without taking a lock.
+ */
+ jrnlOpRes flushNoLock();
+
+ /**
+ * \brief Internal-use sync call which operates without taking a lock.
+ */
+ jrnlOpRes syncNoLock(const double timeout);
+
+};
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_AsyncJournal_h_
+
diff --git a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h
new file mode 100644
index 0000000000..b13caa5462
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AtomicCounter.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_AtomicCounter_h_
+#define qpid_asyncStore_jrnl2_AtomicCounter_h_
+
+#include "ScopedLock.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+/**
+ * \brief Template integral atomic counter class which provides a next() operation to increment then return the
+ * updated count of integral type T. This operation is performed under a thread lock.
+ */
+template <class T> class AtomicCounter
+{
+public:
+ /**
+ * \brief Constructor with an option to set an inital value for the counter.
+ */
+ AtomicCounter(T initialValue = T(0)) :
+ m_cnt(initialValue)
+ {}
+
+ /**
+ * \brief Destructor
+ */
+ virtual ~AtomicCounter()
+ {}
+
+ /**
+ * \brief Increment, then return new value of the internal counter. This is thread-safe and takes a lock
+ * in order to perform the increment.
+ *
+ * \note This is a non-zero counter. By default, the counter starts with the value 0, and consequently the
+ * first call to next() will return 1. Upon overflow, the counter will be incremented twice so as to avoid
+ * returning the value 0.
+ */
+ virtual T next()
+ {
+ // --- START OF CRITICAL SECTION ---
+ ScopedLock l(m_mutex);
+ while (!++m_cnt) ; // Cannot return 0x0 if m_cnt should overflow
+ return m_cnt;
+ } // --- END OF CRITICAL SECTION ---
+
+protected:
+ T m_cnt; ///< Internal count value
+ ScopedMutex m_mutex; ///< Internal lock used to increment the counter
+
+};
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_AtomicCounter_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/Configuration.h b/cpp/src/qpid/asyncStore/jrnl2/Configuration.h
new file mode 100644
index 0000000000..5e5aa176dd
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/Configuration.h
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Configuration.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_Configuration_h_
+#define qpid_asyncStore_jrnl2_Configuration_h_
+
+#include <stdint.h> // uint8_t
+
+#if defined(__i386__) /* little endian, 32 bits */
+ #define JRNL_LITTLE_ENDIAN
+// #define JRNL_32_BIT
+#elif defined(__PPC__) || defined(__s390__) /* big endian, 32 bits */
+ #define JRNL_BIG_ENDIAN
+// #define JRNL_32_BIT
+#elif defined(__ia64__) || defined(__x86_64__) || defined(__alpha__) /* little endian, 64 bits */
+ #define JRNL_LITTLE_ENDIAN
+// #define JRNL_64_BIT
+#elif defined(__powerpc64__) || defined(__s390x__) /* big endian, 64 bits */
+ #define JRNL_BIG_ENDIAN
+// #define JRNL_64_BIT
+#else
+ #error Unable to determine endianness
+#endif
+
+
+/**
+* <b>Rule:</b> Data block size (JRNL_DBLK_SIZE) MUST be a power of 2 such that
+* <pre>
+* JRNL_DBLK_SIZE * JRNL_SBLK_SIZE == n * 512 (n = 1,2,3...)
+* </pre>
+* (The disk softblock size is 512 for Linux kernels >= 2.6)
+*/
+//#define JRNL_DBLK_SIZE 128 ///< Data block size in bytes (CANNOT BE LESS THAN 32!)
+//#define JRNL_SBLK_SIZE 4 ///< Disk softblock size in multiples of JRNL_DBLK_SIZE
+//#define JRNL_MIN_FILE_SIZE 128 ///< Min. jrnl file size in sblks (excl. FileHeader)
+//#define JRNL_MAX_FILE_SIZE 4194304 ///< Max. jrnl file size in sblks (excl. FileHeader)
+//#define JRNL_MIN_NUM_FILES 4 ///< Min. number of journal files
+//#define JRNL_MAX_NUM_FILES 64 ///< Max. number of journal files
+//#define JRNL_ENQ_THRESHOLD 80 ///< Percent full when enqueue connection will be closed
+//
+//#define JRNL_RMGR_PAGE_SIZE 128 ///< Journal page size in softblocks
+//#define JRNL_RMGR_PAGES 16 ///< Number of pages to use in wmgr
+//
+//#define JRNL_WMGR_DEF_PAGE_SIZE 64 ///< Journal write page size in softblocks (default)
+//#define JRNL_WMGR_DEF_PAGES 32 ///< Number of pages to use in wmgr (default)
+//
+//#define JRNL_WMGR_MAXDTOKPP 1024 ///< Max. dtoks (data blocks) per page in wmgr
+//#define JRNL_WMGR_MAXWAITUS 100 ///< Max. wait time (us) before submitting AIO
+//
+//#define JRNL_INFO_EXTENSION "jinf" ///< Extension for journal info files
+//#define JRNL_DATA_EXTENSION "jdat" ///< Extension for journal data files
+//#define RHM_JDAT_TXA_MAGIC 0x614d4852 ///< ("RHMa" in little endian) Magic for dtx abort hdrs
+//#define RHM_JDAT_TXC_MAGIC 0x634d4852 ///< ("RHMc" in little endian) Magic for dtx commit hdrs
+//#define RHM_JDAT_DEQ_MAGIC 0x644d4852 ///< ("RHMd" in little endian) Magic for deq rec hdrs
+//#define RHM_JDAT_ENQ_MAGIC 0x654d4852 ///< ("RHMe" in little endian) Magic for enq rec hdrs
+//#define RHM_JDAT_FILE_MAGIC 0x664d4852 ///< ("RHMf" in little endian) Magic for file hdrs
+//#define RHM_JDAT_EMPTY_MAGIC 0x784d4852 ///< ("RHMx" in little endian) Magic for empty dblk
+//#define RHM_JDAT_VERSION 0x01 ///< Version (of file layout)
+//#define RHM_CLEAN_CHAR 0xff ///< Char used to clear empty space on disk
+
+#define RHM_LENDIAN_FLAG 0x0 ///< Value of little endian flag on disk
+#define RHM_BENDIAN_FLAG 0x1 ///< Value of big endian flag on disk
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+/**
+ * \brief Class aggregating the journal and store hard-coded configuration values.
+ */
+class Configuration
+{
+public:
+ static const uint8_t s_endianValue =
+#if defined(JRNL_LITTLE_ENDIAN)
+ RHM_LENDIAN_FLAG;
+#elif defined(JRNL_BIG_ENDIAN)
+ RHM_BENDIAN_FLAG;
+#else
+# error Unknown endianness
+#endif
+};
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_Configuration_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/DataOpState.cpp b/cpp/src/qpid/asyncStore/jrnl2/DataOpState.cpp
new file mode 100644
index 0000000000..79d1133cf6
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/DataOpState.cpp
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file DataOpState.cpp
+ */
+
+#include "DataOpState.h"
+
+#include "JournalError.h"
+
+#include <sstream> // std::ostringstream
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+DataOpState::DataOpState() :
+ State<opState_t>()
+{}
+
+DataOpState::DataOpState(const opState_t s) :
+ State<opState_t>(s)
+{}
+
+DataOpState::DataOpState(const DataOpState& s) :
+ State<opState_t>(s)
+{}
+
+DataOpState::~DataOpState()
+{}
+
+void
+DataOpState::enqueue()
+{
+ set(OP_ENQUEUE);
+}
+
+void
+DataOpState::dequeue()
+{
+ set(OP_DEQUEUE);
+}
+
+const char*
+DataOpState::getAsStr() const
+{
+ return s_toStr(m_state);
+}
+
+// static
+const char*
+DataOpState::s_toStr(const opState_t s)
+{
+ switch (s) {
+ case OP_NONE:
+ return "OP_NONE";
+ case OP_ENQUEUE:
+ return "OP_ENQUEUE";
+ case OP_DEQUEUE:
+ return "OP_DEQUEUE";
+ default:
+ std::ostringstream oss;
+ oss << "<unknown state (" << "s" << ")>";
+ return oss.str().c_str();
+ }
+}
+
+// protected
+void
+DataOpState::set(const opState_t s)
+{
+ // State transition logic: set stateError to true if an invalid transition is attempted
+ bool stateTransitionError = false;
+ switch(m_state) {
+ case OP_NONE:
+ if (s != OP_ENQUEUE) stateTransitionError = true;
+ break;
+ case OP_ENQUEUE:
+ if (s != OP_DEQUEUE) stateTransitionError = true;
+ break;
+ case OP_DEQUEUE: // Cannot move out of this state except with reset()
+ stateTransitionError = true;
+ break;
+ }
+ if (stateTransitionError) {
+ THROW_STATE_EXCEPTION(JournalError::JERR_MSGOPSTATE, s_toStr(s), s_toStr(m_state), "DataOpState", "set");
+ }
+ m_state = s;
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/DataOpState.h b/cpp/src/qpid/asyncStore/jrnl2/DataOpState.h
new file mode 100644
index 0000000000..492f0b3f6b
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/DataOpState.h
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file DataOpState.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_DataOpState_h_
+#define qpid_asyncStore_jrnl2_DataOpState_h_
+
+#include "State.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+/**
+ * \brief Enumeration of valid data record enqueue/dequeue states.
+ */
+typedef enum {
+ OP_NONE = 0, ///< Data record has not been processed by async store
+ OP_ENQUEUE, ///< Data record has been enqueued
+ OP_DEQUEUE ///< Data record has been dequeued
+} opState_t;
+
+/**
+ * \brief State machine to control the data record operational state (ie none -> enqueued -> dequeued).
+ *
+ * The following state diagram shows valid data record operational state transitions:
+ * \dot
+ * digraph DataOpState {
+ * node [fontname=Helvetica, fontsize=8];
+ * OP_NONE [URL="\ref OP_NONE"]
+ * OP_ENQUEUE [URL="\ref OP_ENQUEUE"]
+ * OP_DEQUEUE [URL="\ref OP_DEQUEUE"]
+ *
+ * edge [fontname=Helvetica, fontsize=8]
+ * OP_NONE->OP_ENQUEUE [label=" enqueue()" URL="\ref qpid::jrnl2::DataOpState::enqueue()"]
+ * OP_ENQUEUE->OP_DEQUEUE [label=" dequeue()" URL="\ref qpid::jrnl2::DataOpState::enqueue()"]
+ * }
+ * \enddot
+ */
+class DataOpState: public State<opState_t>
+{
+public:
+ /**
+ * \brief Default constructor, setting internal state to OP_NONE.
+ */
+ DataOpState();
+
+ /**
+ * \brief Constructor allowing the internal state to be preset to any valid value of opState_t.
+ *
+ * \param s State value to which the internal state of this new instance should be initialized.
+ */
+ DataOpState(const opState_t s);
+
+ /**
+ * \brief Copy constructor.
+ *
+ * \param s Instance from which this new instance state should be copied.
+ */
+ DataOpState(const DataOpState& s);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~DataOpState();
+
+ /**
+ * \brief Changes the data record operational state from OP_NONE to OP_ENQUEUE.
+ *
+ * \throws JournalException with JERR_ILLEGALDTOKOPSTATECHANGE if current state makes it illegal to change
+ * to OP_ENQUEUE according to the state machine semantics.
+ */
+ void enqueue();
+
+ /**
+ * \brief Changes the data record operational state from OP_ENQUEUE to OP_DEQUEUE.
+ *
+ * \throws JournalException with JERR_ILLEGALDTOKOPSTATECHANGE if current state makes it illegal to change
+ * to OP_DEQUEUE according to the state machine semantics.
+ */
+ void dequeue();
+
+ /**
+ * \brief Get the current state in a string form.
+ *
+ * \returns String representation of internal state m_state as returned by static function s_toStr().
+ */
+ const char* getAsStr() const;
+
+ /**
+ * \brief Static helper function to convert a numeric opState_t type to a string representation.
+ */
+ static const char* s_toStr(const opState_t s);
+
+protected:
+ /**
+ * \brief Set (or change) the value of m_state. This function implements the state machine checks for
+ * legal state change transitions, and throw an exception if an illegal state transition is requested.
+ *
+ * \param s State to which this machine should be changed.
+ * \throws JournalException with JERR_ILLEGALDTOKOPSTATECHANGE if current state makes it illegal to change
+ * to the requested state \a s.
+ */
+ void set(const opState_t s);
+
+};
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_DataOpState_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/DataToken.cpp b/cpp/src/qpid/asyncStore/jrnl2/DataToken.cpp
new file mode 100644
index 0000000000..cd4969a7fc
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/DataToken.cpp
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file DataToken.cpp
+ */
+
+#include "DataToken.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+DataToken::DataToken() :
+ m_dataOpState(),
+ m_recordId(s_recordIdCounter.next()),
+ m_externalRecordIdFlag(false)
+{}
+
+DataToken::DataToken(const recordId_t rid) :
+ m_dataOpState(),
+ m_recordId(rid),
+ m_externalRecordIdFlag(true)
+{}
+
+DataToken::~DataToken() {}
+
+const DataOpState&
+DataToken::getDataOpState() const
+{
+ return m_dataOpState;
+}
+
+DataOpState&
+DataToken::getDataOpState()
+{
+ return m_dataOpState;
+}
+
+const DataWrComplState&
+DataToken::getDataWrComplState() const
+{
+ return m_dataWrComplState;
+}
+
+DataWrComplState&
+DataToken::getDataWrComplState()
+{
+ return m_dataWrComplState;
+}
+
+bool
+DataToken::isTransient() const
+{
+ return m_transientFlag;
+}
+
+bool
+DataToken::isExternal() const
+{
+ return m_externalFlag;
+}
+
+const std::string&
+DataToken::getExternalLocation() const
+{
+ return m_externalLocation;
+}
+
+recordId_t
+DataToken::getRecordId() const
+{
+ return m_recordId;
+}
+
+bool
+DataToken::isRecordIdExternal() const
+{
+ return m_externalRecordIdFlag;
+}
+
+recordId_t
+DataToken::getDequeueRecordId() const
+{
+ return m_dequeueRecordId;
+}
+
+void
+DataToken::setRecordId(const recordId_t rid)
+{
+ m_recordId = rid;
+ m_externalRecordIdFlag = true;
+}
+
+void
+DataToken::setDequeueRecordId(const recordId_t drid)
+{
+ m_dequeueRecordId = drid;
+}
+
+void
+DataToken::toStream(std::ostream& os) const
+{
+ /// \todo TODO: Implementation required
+ os << "status string";
+}
+
+// private static
+RecordIdCounter_t DataToken::s_recordIdCounter;
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/DataToken.h b/cpp/src/qpid/asyncStore/jrnl2/DataToken.h
new file mode 100644
index 0000000000..2058cc75ad
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/DataToken.h
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file DataToken.h
+ */
+
+#ifndef qpid_jrnl2_asyncStore_DataToken_h_
+#define qpid_jrnl2_asyncStore_DataToken_h_
+
+#include "DataOpState.h"
+#include "DataWrComplState.h"
+#include "RecordIdCounter.h"
+
+#include <string>
+#include <stdint.h> // uint64_t
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+/**
+ * \brief Contains the state and metadata for each record handled by the async store.
+ *
+ * The DataToken keeps the state of the data record through the AIO write and recovery process. It may also
+ * be used to set certain properties of the data record.
+ */
+class DataToken : public Streamable
+{
+public:
+ /**
+ * \brief Default constructor, which internally assigns a unique record Id (RID).
+ */
+ DataToken();
+
+ /**
+ * \brief Constructor which accepts an external record id (RID). No check is made of the uniqueness of this RID.
+ *
+ * \param rid The externally assigned record Id, which must be unique across this journal instance within the
+ * window presented by the journal files.
+ */
+ DataToken(const recordId_t rid);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~DataToken();
+
+ // Get functions
+
+ /**
+ * \brief Get the const DataOpState state manager object for this data token.
+ *
+ * \return \c const state of the data record associated with this DataToken instance.
+ */
+ const DataOpState& getDataOpState() const;
+
+ /**
+ * \brief Get the DataOpState state manager object for this data token.
+ *
+ * \return Non -\c const (modifiable) state of the data record associated with this DataToken instance.
+ */
+ DataOpState& getDataOpState();
+
+ /**
+ * \brief Get the const DataWrComplState state manager object for this data token.
+ *
+ * \return \c const state of the data record associated with this DataToken instance.
+ */
+ const DataWrComplState& getDataWrComplState() const;
+
+ /**
+ * \brief Get the DataWrComplState state manager object for this data token.
+ *
+ * \return Non -\c const (modifiable) state of the data record associated with this DataToken instance.
+ */
+ DataWrComplState& getDataWrComplState();
+
+ /**
+ * \brief Check if this data is marked as transient (will not be recovered).
+ *
+ * \return \c true if the data is transient, \c false otherwise.
+ */
+ bool isTransient() const;
+
+ /**
+ * \brief Check if this data is marked as external (i.e. has its content stored outside the store journal).
+ *
+ * \return \c true if the data is external (has its content stored outside the journal), \c false otherwise.
+ */
+ bool isExternal() const;
+
+ /**
+ * \brief Return the external storage location of the data if it is marked as external. This may be a path
+ * or URL.
+ *
+ * \return Location or URI pointing to the location of the storage for this data record if it is marked external.
+ */
+ const std::string& getExternalLocation() const;
+
+ /**
+ * \brief Return the Record Id (RID) for this data record.
+ *
+ * \return Record Id (RID) for this data record.
+ */
+ recordId_t getRecordId() const;
+
+ /**
+ * \brief Checks if the Record Id assigned to this data record was supplied externally.
+ *
+ * \return \c true if the data record had its Record Id (RID) assigned externally from the store, \c false
+ * otherwise.
+ */
+ bool isRecordIdExternal() const;
+
+ /**
+ * \brief Returns the recordId of the dequeuing record (if dequeued)
+ *
+ * \return The Record Id of the dequeue record which dequeues this record, or 0x0 if it has not been dequeued
+ * (or cannot be dequeued).
+ */
+ recordId_t getDequeueRecordId() const;
+
+ // Set functions
+
+ /**
+ * \brief Set the record Id (RID) for this record. No check is made of the uniqueness of this RID.
+ */
+ void setRecordId(const recordId_t rid);
+
+ /**
+ * \brief Set the record Id (RID) of the record which dequeues this record.
+ */
+ void setDequeueRecordId(const recordId_t drid);
+
+ // Debug aid(s)
+
+ /**
+ * \brief Stream a formatted summary of the state of this DataToken object.
+ */
+ virtual void toStream(std::ostream& os) const;
+
+protected:
+ DataOpState m_dataOpState; ///< Data operational state (none, enqueued, dequeued)
+ DataWrComplState m_dataWrComplState; ///< Data write completion state (none, part, complete)
+ bool m_transientFlag; ///< True if the data record is transient (eg for Flow-To-Disk)
+ bool m_externalFlag; ///< True if the data record is stored externally from the store
+ std::string m_externalLocation; ///< Location of data record when stored externally
+ recordId_t m_recordId; ///< Record Id which is unique to this store instance
+ bool m_externalRecordIdFlag; ///< True if the record id was set through this token
+ recordId_t m_dequeueRecordId; ///< Record Id of the dequeue record for this data record
+
+private:
+ static RecordIdCounter_t s_recordIdCounter; ///< Static instance keeps record Ids unique across system
+
+};
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_jrnl2_asyncStore_DataToken_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.cpp b/cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.cpp
new file mode 100644
index 0000000000..c6cedc4782
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.cpp
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file DataWrComplState.cpp
+ */
+
+#include "DataWrComplState.h"
+
+#include "JournalError.h"
+
+#include <sstream> // std::ostringstream
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+DataWrComplState::DataWrComplState() :
+ State<wrComplState_t>()
+{}
+
+DataWrComplState::DataWrComplState(const wrComplState_t s) :
+ State<wrComplState_t>(s)
+{}
+
+DataWrComplState::DataWrComplState(const DataWrComplState& s) :
+ State<wrComplState_t>(s)
+{}
+
+DataWrComplState::~DataWrComplState()
+{}
+
+void
+DataWrComplState::complete()
+{
+ set(WR_COMPLETE);
+}
+
+void
+DataWrComplState::partComplete()
+{
+ set(WR_PART);
+}
+
+const char*
+DataWrComplState::getAsStr() const
+{
+ return s_toStr(m_state);
+}
+
+// static
+const char*
+DataWrComplState::s_toStr(const wrComplState_t s)
+{
+ switch (s) {
+ case WR_NONE:
+ return "WR_NONE";
+ case WR_PART:
+ return "WR_PART";
+ case WR_COMPLETE:
+ return "WR_COMPLETE";
+ default:
+ std::ostringstream oss;
+ oss << "<unknown state (" << "s" << ")>";
+ return oss.str().c_str();
+ }
+}
+
+// protected
+void
+DataWrComplState::set(const wrComplState_t s)
+{
+ // State transition logic: set stateError to true if an invalid transition is attempted
+ bool stateTransitionError = false;
+ switch(m_state) {
+ case WR_NONE:
+ if (s != WR_PART && s != WR_COMPLETE) stateTransitionError = true;
+ break;
+ case WR_PART:
+ if (s != WR_COMPLETE) stateTransitionError = true;
+ break;
+ case WR_COMPLETE: // Cannot move out of this state except with reset()
+ stateTransitionError = true;
+ break;
+ }
+ if (stateTransitionError) {
+ THROW_STATE_EXCEPTION(JournalError::JERR_MSGWRCMPLSTATE, s_toStr(s), s_toStr(m_state), "DataWrComplState",
+ "set");
+ }
+ m_state = s;
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.h b/cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.h
new file mode 100644
index 0000000000..691b9c7b82
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.h
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file DataWrComplState.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_DataWrComplState_h_
+#define qpid_asyncStore_jrnl2_DataWrComplState_h_
+
+#include "State.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+/**
+ * \brief Enumeration of valid data record write completion states.
+ */
+typedef enum {
+ WR_NONE = 0, ///< Data record has not been written into async store buffer
+ WR_PART, ///< Data record has been partly written to store buffer
+ WR_COMPLETE ///< Data record write to store buffer is complete
+} wrComplState_t;
+
+/**
+ * \brief State machine for the data record write completion state - ie whether the data record has been written
+ * to the store buffer.
+ *
+ * The following state diagram shows valid write buffer states and transitions:
+ * \dot
+ * digraph wrComplState_t {
+ * node [fontname=Helvetica, fontsize=8];
+ * WR_NONE [URL="\ref WR_NONE"]
+ * WR_PART [URL="\ref WR_PART"]
+ * WR_COMPLETE [URL="\ref WR_COMPLETE"]
+ *
+ * edge [fontname=Helvetica, fontsize=8]
+ * WR_NONE->WR_PART [label=" partComplete()" URL="\ref qpid::jrnl2::DataWrComplState::partComplete()"]
+ * WR_NONE->WR_COMPLETE [label=" complete()" URL="\ref qpid::jrnl2::DataWrComplState::complete()"]
+ * WR_PART->WR_COMPLETE [label=" complete()" URL="\ref qpid::jrnl2::DataWrComplState::complete()"]
+ * }
+ * \enddot
+ */
+class DataWrComplState: public State<wrComplState_t> {
+public:
+ /**
+ * \brief Default constructor, setting internal state to WR_NONE.
+ */
+ DataWrComplState();
+
+ /**
+ * \brief Constructor allowing the internal state to be preset to any valid value of wrComplState_t.
+ *
+ * \param s State value to which the internal state of this new instance should be initialized.
+ */
+ DataWrComplState(const wrComplState_t s);
+
+ /**
+ * \brief Copy constructor.
+ *
+ * \param s Instance from which this new instance state should be copied.
+ */
+ DataWrComplState(const DataWrComplState& s);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~DataWrComplState();
+
+ /**
+ * \brief Changes the data record operational state from WR_NONE or WR_PART to WR_COMPLETE.
+ *
+ * \throws JournalException with JERR_ILLEGALDTOKOPSTATECHANGE if current state makes it illegal to change
+ * to WR_COMPLETE according to the state machine semantics.
+ */
+ void complete();
+
+ /**
+ * \brief Changes the data record operational state from WR_NONE to WR_PART.
+ *
+ * \throws JournalException with JERR_ILLEGALDTOKOPSTATECHANGE if current state makes it illegal to change
+ * to WR_PART according to the state machine semantics.
+ */
+ void partComplete();
+
+ /**
+ * \brief Get the current state in a string form.
+ *
+ * \returns String representation of internal state m_state as returned by static function s_toStr().
+ */
+ const char* getAsStr() const;
+
+ /**
+ * \brief Static helper function to convert a numeric wrComplState_t type to a string representation.
+ */
+ static const char* s_toStr(const wrComplState_t s);
+
+protected:
+ /**
+ * \brief Set (or change) the value of m_state. This function implements the state machine checks for
+ * legal state change transitions, and throw an exception if an illegal state transition is requested.
+ *
+ * \param s State to which this machine should be changed.
+ * \throws JournalException with JERR_ILLEGALDTOKOPSTATECHANGE if current state makes it illegal to change
+ * to the requested state \a s.
+ */
+ void set(const wrComplState_t s);
+
+};
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_DataWrComplState_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.cpp b/cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.cpp
new file mode 100644
index 0000000000..bf8ef5fc7f
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.cpp
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file DequeueHeader.cpp
+ */
+
+#include "DequeueHeader.h"
+
+#include "RecordTail.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+DequeueHeader::DequeueHeader() :
+ RecordHeader(),
+ m_dequeuedRecordId(0),
+ m_xidSize(0)
+{}
+
+DequeueHeader::DequeueHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const uint64_t dequeuedRecordId,
+ const uint64_t xidSize,
+ const bool overwriteIndicator,
+ const bool tplCommitOnTxnComplFlag) :
+ RecordHeader(magic, version, recordId, overwriteIndicator),
+ m_dequeuedRecordId(dequeuedRecordId),
+ m_xidSize(xidSize)
+{
+ setTplCommitOnTxnComplFlag(tplCommitOnTxnComplFlag);
+}
+
+DequeueHeader::DequeueHeader(const DequeueHeader& dh) :
+ RecordHeader(dh),
+ m_dequeuedRecordId(dh.m_dequeuedRecordId),
+ m_xidSize(dh.m_xidSize)
+{}
+
+DequeueHeader::~DequeueHeader()
+{}
+
+void
+DequeueHeader::copy(const DequeueHeader& dh)
+{
+ RecordHeader::copy(dh);
+ m_dequeuedRecordId = dh.m_dequeuedRecordId;
+ m_xidSize = dh.m_xidSize;
+}
+
+void
+DequeueHeader::reset()
+{
+ RecordHeader::reset();
+ m_dequeuedRecordId = 0;
+ m_xidSize = 0;
+}
+
+bool
+DequeueHeader::getTplCommitOnTxnComplFlag() const
+{
+ return m_flags & DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK;
+}
+
+void
+DequeueHeader::setTplCommitOnTxnComplFlag(const bool commitOnTxnCompl)
+{
+ m_flags = commitOnTxnCompl ?
+ m_flags | DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK :
+ m_flags & (~DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK);
+}
+
+//static
+uint64_t
+DequeueHeader::getHeaderSize()
+{
+ return static_cast<uint64_t>(sizeof(DequeueHeader));
+}
+
+uint64_t
+DequeueHeader::getBodySize() const
+{
+ return m_xidSize;
+}
+
+uint64_t
+DequeueHeader::getRecordSize() const
+{
+ return getHeaderSize() + (getBodySize() > 0LL ?
+ getBodySize() + RecordTail::getSize() :
+ 0);
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.h b/cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.h
new file mode 100644
index 0000000000..deb23915f7
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.h
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file DequeueHeader.h
+ *
+ * List of journal record structs:
+ * struct DequeueHeader <-- This file
+ * struct EnqueueHeader
+ * struct EventHeader
+ * struct FileHeader
+ * struct RecordHeader
+ * struct RecordTail
+ * struct TransactionHeader
+ *
+ * Overview of journal record structs:
+ *
+ * <pre>
+ * +------------+ +--------------+
+ * | RecordTail | | RecordHeader |
+ * +------------+ | (abstract) |
+ * +--------------+
+ * ^
+ * |
+ * +----------------+---------+-------+-------------------+
+ * | | | |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * ^
+ * |
+ * +---------------+
+ * | EnqueueHeader |
+ * +---------------+
+ * </pre>
+ */
+
+#ifndef qpid_asyncStore_jrnl2_DequeueHeader_h_
+#define qpid_asyncStore_jrnl2_DequeueHeader_h_
+
+#include "RecordHeader.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for dequeue record header. This record marks a previous enqueue as
+ * logically deleted.
+ *
+ * The dequeue record conceptually marks a previous enqueue record as dequeued,
+ * or logically deleted. This is achieved by recording the record ID (rid) of
+ * the enqueue record being deleted in the m_dequeueRecordId field. Note that this
+ * rid is distinct from the rid assigned to the dequeue record itself in m_recordId.
+ *
+ * Dequeue records do not carry data. However, if it is transactional (and thus has
+ * a transaction ID (xid), then this record will be terminated by a RecordTail struct.
+ * If, on the other hand, this record is non-transactional, then the rec_tail
+ * is absent.
+ *
+ * Note that this record had its own rid distinct from the rid of the record it is dequeuing.
+ * The rid field below is the rid of the dequeue record itself; the deq-rid field is the rid of a
+ * previous enqueue record being dequeued by this record.
+ *
+ * Record layout in binary format (32 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x00 | m_magic | v | e | m_flags | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader
+ * 0x08 | m_recordId | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x10 | m_dequeuedRecordId |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x18 | m_xidSize |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ * <table>
+ * <tr>
+ * <td>v</td>
+ * <td>file version [ <code>_version</code> ] (If the format or encoding of
+ * this file changes, then this number should be incremented)</td>
+ * </tr>
+ * <tr>
+ * <td>e</td>
+ * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+ * little endian, <b>true</b> (0x01) for big endian</td>
+ * </tr>
+ * </table>
+ * <pre>
+ * Non-transactional:
+ * +---------+
+ * | dequeue |
+ * | header |
+ * +---------+
+ * <-- 32 --->
+ *
+ * Transactional:
+ * +---------+------------------------+--------+
+ * | dequeue | XID | record |
+ * | header | | tail |
+ * +---------+------------------------+--------+
+ * <-- 32 ---><----- m_xidSize ------><-- 16 -->
+ * </pre>
+ */
+class DequeueHeader : public RecordHeader
+{
+public:
+ uint64_t m_dequeuedRecordId; ///< Record ID of dequeued record
+ uint64_t m_xidSize; ///< XID size
+
+ /**
+ * \brief Mask for the record header flags field m_flags which is used in dequeue records in the Transaction
+ * Prepared List (TPL) to indicate that a closed transaction resulted in a commit (if the flag is set)
+ * or an abort (if the flag is not set).
+ */
+ static const uint16_t DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK = 0x10;
+
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ DequeueHeader();
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ *
+ * \param magic The magic for this record
+ * \param version Version of this record
+ * \param recordId Record identifier for this record
+ * \param dequeuedRecordId Record identifier of the record being dequeued by this record
+ * \param xidSize Size of the transaction (or distributed transaction) ID for this record
+ * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this
+ * record
+ * \param tplCommitOnTxnComplFlag
+ */
+ DequeueHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const uint64_t dequeuedRecordId,
+ const uint64_t xidSize,
+ const bool overwriteIndicator,
+ const bool tplCommitOnTxnComplFlag = false);
+
+ /**
+ * \brief Copy constructor
+ *
+ * \param dh Instance to be copied
+ */
+ DequeueHeader(const DequeueHeader& dh);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~DequeueHeader();
+
+ /**
+ * \brief Convenience copy method.
+ */
+ void copy(const DequeueHeader& dh);
+
+ /**
+ * \brief Reset this record to default values (mostly 0)
+ */
+ void reset();
+
+ /**
+ * \brief Return the value of the tplCommitOnTxnComplFlag for this record. This flag is used only within the
+ * TPL, and if set, indicates that the transaction was closed using a commit. If not set, the transaction was
+ * closed using an abort. This is used during recovery of the transactions in the store.
+ *
+ * \returns \b true if the tplCommitOnTxnComplFlag flag for this record is set, \b false otherwise.
+ */
+ bool getTplCommitOnTxnComplFlag() const;
+
+ /**
+ * \brief Set the value of the tplCommitOnTxnComplFlag for this record. This is only used in the TPL, and is
+ * ignored elsewhere.
+ *
+ * \param commitOnTxnCompl The value to be set in the tplCommitOnTxnComplFlag. If \b true, the transaction was
+ * closed with a commit; if \b false, with an abort.
+ */
+ void setTplCommitOnTxnComplFlag(const bool commitOnTxnCompl);
+
+ /**
+ * \brief Return the header size of this record in bytes.
+ *
+ * \returns Size of record header in bytes.
+ */
+ static uint64_t getHeaderSize();
+
+ /**
+ * \brief Return the body (xid and data) size of this record in bytes.
+ *
+ * \returns Size of record body in bytes.
+ */
+ uint64_t getBodySize() const;
+
+ /**
+ * \brief Return total size of this record in bytes, being in the case of the dequeue record the size of the
+ * header, the size of the body (xid only) and the size of the tail.
+ *
+ * \returns Total size of record in bytes.
+ */
+ uint64_t getRecordSize() const;
+
+};
+
+#pragma pack()
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_DequeueHeader_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.cpp b/cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.cpp
new file mode 100644
index 0000000000..213821ebe2
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.cpp
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file EnqueueHeader.cpp
+ */
+
+#include "EnqueueHeader.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+EnqueueHeader::EnqueueHeader() :
+ EventHeader()
+{}
+
+EnqueueHeader::EnqueueHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const uint64_t xidSize,
+ const uint64_t dataSize,
+ const bool overwriteIndicator,
+ const bool transient,
+ const bool external) :
+ EventHeader(magic, version, recordId, xidSize, dataSize, overwriteIndicator)
+{
+ setTransientFlag(transient);
+ setExternalFlag(external);
+}
+
+EnqueueHeader::EnqueueHeader(const EnqueueHeader& eh) :
+ EventHeader(eh)
+{}
+
+EnqueueHeader::~EnqueueHeader()
+{}
+
+bool
+EnqueueHeader::getTransientFlag() const
+{
+ return m_flags & ENQ_HDR_TRANSIENT_MASK;
+}
+
+void
+EnqueueHeader::setTransientFlag(const bool transient)
+{
+ m_flags = transient ?
+ m_flags | ENQ_HDR_TRANSIENT_MASK :
+ m_flags & (~ENQ_HDR_TRANSIENT_MASK);
+}
+
+bool
+EnqueueHeader::getExternalFlag() const
+{
+ return m_flags & ENQ_HDR_EXTERNAL_MASK;
+}
+
+void
+EnqueueHeader::setExternalFlag(const bool external)
+{
+ m_flags = external ?
+ m_flags | ENQ_HDR_EXTERNAL_MASK :
+ m_flags & (~ENQ_HDR_EXTERNAL_MASK);
+}
+
+//static
+uint64_t
+EnqueueHeader::getHeaderSize()
+{
+ return sizeof(EnqueueHeader);
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.h b/cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.h
new file mode 100644
index 0000000000..4d6dc9cfed
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.h
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file EnqueueHeader.h
+ *
+ * List of journal record structs:
+ * struct DequeueHeader
+ * struct EnqueueHeader <-- This file
+ * struct EventHeader
+ * struct FileHeader
+ * struct RecordHeader
+ * struct RecordTail
+ * struct TransactionHeader
+ *
+ * Overview of journal record structs:
+ *
+ * <pre>
+ * +------------+ +--------------+
+ * | RecordTail | | RecordHeader |
+ * +------------+ | (abstract) |
+ * +--------------+
+ * ^
+ * |
+ * +----------------+---------+-------+-------------------+
+ * | | | |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * ^
+ * |
+ * +---------------+
+ * | EnqueueHeader |
+ * +---------------+
+ * </pre>
+ */
+
+#ifndef qpid_asyncStore_jrnl2_EnqueueHeader_h_
+#define qpid_asyncStore_jrnl2_EnqueueHeader_h_
+
+#include "EventHeader.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for enqueue record header. This record stores message data to the
+ * journal.
+ *
+ * Enqueue records record the content of messages for possible later recovery, and
+ * are so-called because they correspond with the event of enqueuing the record on
+ * a queue.
+ *
+ * In addition to the fields inherited from RecordHeader, this struct includes both the
+ * transaction id (xid) and data blob sizes.
+ *
+ * This header precedes the enqueued message data in journal files, and unless there is
+ * no xid and no data (both with 0 length), is followed by a RecordTail.
+ *
+ * Record layout in binary format (32 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x00 | m_magic | v | e | m_flags | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader
+ * 0x08 | m_recordId | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x10 | m_xidSize |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x18 | m_dataSize |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ * <table>
+ * <tr>
+ * <td>v</td>
+ * <td>file version [ <code>_version</code> ] (If the format or encoding of
+ * this file changes, then this number should be incremented)</td>
+ * </tr>
+ * <tr>
+ * <td>e</td>
+ * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+ * little endian, <b>true</b> (0x01) for big endian</td>
+ * </tr>
+ * </table>
+ * <pre>
+ * No content (ie m_xidSize==0 AND m_dataSize==0):
+ * +---------+
+ * | enqueue |
+ * | header |
+ * +---------+
+ * <-- 32 --->
+ *
+ * With content:
+ * +---------+----------------+---------------------------+--------+
+ * | enqueue | XID | Data | record |
+ * | header | | | tail |
+ * +---------+----------------+---------------------------+--------+
+ * <-- 32 ---><-- m_xidSize --><------ m_dataSize -------><-- 16 -->
+ * </pre>
+ */
+class EnqueueHeader : public EventHeader
+{
+public:
+ /**
+ * \brief Mask for the record header flags field m_flags which is used to indicate that a record is transient
+ * (if the flag is set) or durable (if the flag is not set).
+ */
+ static const uint16_t ENQ_HDR_TRANSIENT_MASK = 0x10;
+
+ /**
+ * \brief Mask for the record header flags field m_flags which is used to indicate that a record is using
+ * external storage for its data content (if the flag is set). If the flag is not set, then the data content
+ * is stored in the journal itself.
+ */
+ static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20;
+
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ EnqueueHeader();
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ *
+ * \param magic The magic for this record
+ * \param version Version of this record
+ * \param recordId Record identifier for this record
+ * \param xidSize Size of the transaction (or distributed transaction) ID for this record
+ * \param dataSize Size of the opaque data block for this record
+ * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this
+ * record
+ * \param transient Flag indicating that this record is transient (ie to be discarded on recovery)
+ * \param external Flag indicating that this record's data is stored externally to the journal, the data portion
+ * of the record identifies the storage location.
+ */
+ EnqueueHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const uint64_t xidSize,
+ const uint64_t dataSize,
+ const bool overwriteIndicator,
+ const bool transient = false,
+ const bool external = false);
+
+ /**
+ * \brief Copy constructor
+ *
+ * \param eh Instance to be copied
+ */
+ EnqueueHeader(const EnqueueHeader& eh);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~EnqueueHeader();
+
+ /**
+ * \brief Return the value of the Transient flag for this record. If set, this record is ignored during
+ * recovery.
+ *
+ * \returns true if the Transient flag for this record is set, false otherwise.
+ */
+ bool getTransientFlag() const;
+
+ /**
+ * \brief Set the value of the Transient flag for this record.
+ *
+ * \param transient The value to be set in the transient flag.
+ */
+ void setTransientFlag(const bool transient = true);
+
+ /**
+ * \brief Return the value of the External flag for this record. If set, this record data is not within the
+ * journal but external to it. The data part of this record contains the location of the stored data.
+ *
+ * \returns true if the Transient flag for this record is set, false otherwise.
+ */
+ bool getExternalFlag() const;
+
+ /**
+ * \brief Set the value of the External flag for this record.
+ *
+ * \param external The value to be set in the External flag.
+ */
+ void setExternalFlag(const bool external = true);
+
+ /**
+ * \brief Return the header size of this record in bytes.
+ *
+ * \returns Size of record header in bytes.
+ */
+ static uint64_t getHeaderSize();
+
+};
+
+#pragma pack()
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_EnqueueHeader_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/EventHeader.cpp b/cpp/src/qpid/asyncStore/jrnl2/EventHeader.cpp
new file mode 100644
index 0000000000..374d161af9
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/EventHeader.cpp
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file EventHeader.cpp
+ */
+
+#include "EventHeader.h"
+
+#include "RecordTail.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+EventHeader::EventHeader() :
+ RecordHeader(),
+ m_xidSize(0),
+ m_dataSize(0)
+{}
+
+EventHeader::EventHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const uint64_t xidSize,
+ const uint64_t dataSize,
+ const bool overwriteIndicator) :
+ RecordHeader(magic, version, recordId, overwriteIndicator),
+ m_xidSize(xidSize),
+ m_dataSize(dataSize)
+{}
+
+EventHeader::EventHeader(const EventHeader& eh) :
+ RecordHeader(eh),
+ m_xidSize(eh.m_xidSize),
+ m_dataSize(eh.m_dataSize)
+{}
+
+EventHeader::~EventHeader()
+{}
+
+void
+EventHeader::copy(const EventHeader& e)
+{
+ RecordHeader::copy(e);
+ m_xidSize = e.m_xidSize;
+ m_dataSize = e.m_dataSize;
+}
+
+void
+EventHeader::reset()
+{
+ RecordHeader::reset();
+ m_xidSize = 0;
+ m_dataSize = 0;
+}
+
+//static
+uint64_t
+EventHeader::getHeaderSize()
+{
+ return sizeof(EventHeader);
+}
+
+uint64_t
+EventHeader::getBodySize() const
+{
+ return m_xidSize + m_dataSize;
+}
+
+uint64_t
+EventHeader::getRecordSize() const
+{
+ return getHeaderSize() + (getBodySize() ?
+ getBodySize() + RecordTail::getSize() :
+ 0);
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/EventHeader.h b/cpp/src/qpid/asyncStore/jrnl2/EventHeader.h
new file mode 100644
index 0000000000..c4f48c1022
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/EventHeader.h
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file EventHeader.h
+ *
+ * List of journal record structs:
+ * struct DequeueHeader
+ * struct EnqueueHeader
+ * struct EventHeader <-- This file
+ * struct FileHeader
+ * struct RecordHeader
+ * struct RecordTail
+ * struct TransactionHeader
+ *
+ * Overview of journal record structs:
+ *
+ * <pre>
+ * +------------+ +--------------+
+ * | RecordTail | | RecordHeader |
+ * +------------+ | (abstract) |
+ * +--------------+
+ * ^
+ * |
+ * +----------------+---------+-------+-------------------+
+ * | | | |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * ^
+ * |
+ * +---------------+
+ * | EnqueueHeader |
+ * +---------------+
+ * </pre>
+ */
+
+#ifndef qpid_asyncStore_jrnl2_EventHeader_h_
+#define qpid_asyncStore_jrnl2_EventHeader_h_
+
+#include "RecordHeader.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for an event record header. Event records can be used to record
+ * system events in the store.
+ *
+ * The EventHeader record type may be used to store events into the journal which do
+ * not constitute data content but changes of state in the broker. These can be
+ * recovered and used to set appropriate state in the broker.
+ *
+ * This record is almost identical to EnqueueRecord, but without the flags. It
+ * precedes the xid and stored event data in journal files, and unless there is
+ * no xid and no data (both with 0 length), is followed by a RecordTail.
+ *
+ * \todo TODO: I am uncertain at this time whether it is necessary to set an XID on an event
+ * record, but in case, I have left this feature in. In any event, there is only a
+ * 1 byte size penalty in the header size for doing so.
+ *
+ * Record layout in binary format (32 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x00 | m_magic | v | e | m_flags | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader
+ * 0x08 | m_recordId | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x10 | m_xidSize |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x18 | m_dataSize |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ * <table>
+ * <tr>
+ * <td>v</td>
+ * <td>file version [ <code>_version</code> ] (If the format or encoding of
+ * this file changes, then this number should be incremented)</td>
+ * </tr>
+ * <tr>
+ * <td>e</td>
+ * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+ * little endian, <b>true</b> (0x01) for big endian</td>
+ * </tr>
+ * </table>
+ * <pre>
+ * No content (ie m_xidSize==0 AND m_dataSize==0):
+ * +---------+
+ * | event |
+ * | header |
+ * +---------+
+ * <-- 32 --->
+ *
+ * With content:
+ * +---------+----------------+---------------------------+--------+
+ * | event | XID | Event data | record |
+ * | header | | | tail |
+ * +---------+----------------+---------------------------+--------+
+ * <-- 32 ---><-- m_xidSize --><------ m_dataSize -------><-- 16 -->
+ * </pre>
+ */
+class EventHeader : public RecordHeader
+{
+public:
+ uint64_t m_xidSize; ///< XID size
+ uint64_t m_dataSize; ///< Record data size
+
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ EventHeader();
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ *
+ * \param magic The magic for this record
+ * \param version Version of this record
+ * \param recordId Record identifier for this record
+ * \param xidSize Size of the transaction (or distributed transaction) ID for this record
+ * \param dataSize Size of the opaque data block for this record
+ * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this
+ * record
+ */
+ EventHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const uint64_t xidSize,
+ const uint64_t dataSize,
+ const bool overwriteIndicator);
+
+ /**
+ * \brief Copy constructor
+ *
+ * \param eh Instance to be copied
+ */
+ EventHeader(const EventHeader& eh);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~EventHeader();
+
+ /**
+ * \brief Convenience copy method.
+ */
+ virtual void copy(const EventHeader& eh);
+
+ /**
+ * \brief Reset this record to default values (mostly 0)
+ */
+ virtual void reset();
+
+ /**
+ * \brief Return the header size of this record in bytes.
+ *
+ * \returns Size of record header in bytes.
+ */
+ static uint64_t getHeaderSize();
+
+ /**
+ * \brief Return the body (data) size of this record in bytes.
+ *
+ * \returns Size of record body in bytes.
+ */
+ virtual uint64_t getBodySize() const;
+
+ /**
+ * \brief Return total size of this record in bytes, being in the case of the enqueue record the size of the
+ * header, the size of the body (xid and data) and the size of the tail.
+ *
+ * \returns Total size of record in bytes.
+ */
+ virtual uint64_t getRecordSize() const;
+
+};
+
+#pragma pack()
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_EventHeader_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/FileHeader.cpp b/cpp/src/qpid/asyncStore/jrnl2/FileHeader.cpp
new file mode 100644
index 0000000000..1b601c4cd7
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/FileHeader.cpp
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file FileHeader.cpp
+ */
+
+#include "FileHeader.h"
+
+#include "JournalError.h"
+
+#include <cerrno>
+#include <cstring>
+#include <ctime>
+#include <sstream>
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+FileHeader::FileHeader() :
+ RecordHeader(),
+ m_physicalFileId(0),
+ m_logicalFileId(0),
+ m_firstRecordOffset(0),
+ m_timestampSeconds(0),
+ m_timestampNanoSeconds(0),
+ m_reserved(0)
+{}
+
+FileHeader::FileHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const bool overwriteIndicator,
+ const uint16_t physicalFileId,
+ const uint16_t logicalFileId,
+ const uint64_t firstRecordOffset,
+ const bool setTimestampFlag) :
+ RecordHeader(magic, version, recordId, overwriteIndicator),
+ m_physicalFileId(physicalFileId),
+ m_logicalFileId(logicalFileId),
+ m_firstRecordOffset(firstRecordOffset),
+ m_timestampSeconds(0),
+ m_timestampNanoSeconds(0),
+ m_reserved(0)
+{
+ if (setTimestampFlag) setTimestamp();
+}
+
+FileHeader::FileHeader(const FileHeader& fh) :
+ RecordHeader(fh),
+ m_physicalFileId(fh.m_physicalFileId),
+ m_logicalFileId(fh.m_logicalFileId),
+ m_firstRecordOffset(fh.m_firstRecordOffset),
+ m_timestampSeconds(fh.m_timestampSeconds),
+ m_timestampNanoSeconds(fh.m_timestampNanoSeconds),
+ m_reserved(fh.m_reserved)
+{}
+
+FileHeader::~FileHeader()
+{}
+
+void
+FileHeader::copy(const FileHeader& fh)
+{
+ RecordHeader::copy(fh);
+ m_physicalFileId = fh.m_physicalFileId;
+ m_logicalFileId = fh.m_logicalFileId;
+ m_firstRecordOffset = fh.m_firstRecordOffset;
+ m_timestampSeconds = fh.m_timestampSeconds;
+ m_timestampNanoSeconds = fh.m_timestampNanoSeconds;
+ m_reserved = fh.m_reserved;
+}
+
+void
+FileHeader::reset()
+{
+ RecordHeader::reset();
+ m_physicalFileId = 0;
+ m_logicalFileId = 0;
+ m_firstRecordOffset = 0;
+ m_timestampSeconds = 0;
+ m_timestampNanoSeconds = 0;
+ m_reserved = 0;
+}
+
+//static
+uint64_t
+FileHeader::getHeaderSize()
+{
+ return sizeof(FileHeader);
+}
+
+uint64_t
+FileHeader::getBodySize() const
+{
+ return 0;
+}
+
+uint64_t
+FileHeader::getRecordSize() const
+{
+ return getHeaderSize();
+}
+
+void
+FileHeader::setTimestamp()
+{
+ /// \todo TODO: Standardize on method for getting time that does not require a context switch.
+ timespec ts;
+ if (::clock_gettime(CLOCK_REALTIME, &ts)) {
+ std::ostringstream oss;
+ oss << FORMAT_SYSERR(errno);
+ throw JournalError(JournalError::JERR_RTCLOCK, oss.str(), "FileHeader", "setTimestamp");
+ }
+ setTimestamp(ts);
+}
+
+void
+FileHeader::setTimestamp(const timespec& ts)
+{
+ m_timestampSeconds = static_cast<uint64_t>(ts.tv_sec);
+ m_timestampNanoSeconds = static_cast<uint32_t>(ts.tv_nsec);
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/FileHeader.h b/cpp/src/qpid/asyncStore/jrnl2/FileHeader.h
new file mode 100644
index 0000000000..2d7a17a90c
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/FileHeader.h
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file FileHeader.h
+ *
+ * List of journal record structs:
+ * struct DequeueHeader
+ * struct EnqueueHeader
+ * struct EventHeader
+ * struct FileHeader <-- This file
+ * struct RecordHeader
+ * struct RecordTail
+ * struct TransactionHeader
+ *
+ * Overview of journal record structs:
+ *
+ * <pre>
+ * +------------+ +--------------+
+ * | RecordTail | | RecordHeader |
+ * +------------+ | (abstract) |
+ * +--------------+
+ * ^
+ * |
+ * +----------------+---------+-------+-------------------+
+ * | | | |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * ^
+ * |
+ * +---------------+
+ * | EnqueueHeader |
+ * +---------------+
+ * </pre>
+ */
+
+#ifndef qpid_asyncStore_jrnl2_FileHeader_h_
+#define qpid_asyncStore_jrnl2_FileHeader_h_
+
+#include "RecordHeader.h"
+
+struct timespec;
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for the data written to the head of all journal files.
+ *
+ * In addition to the fields inherited from RecordHeader, this struct includes the
+ * unique record ID for this record and an offset of the first record in the file.
+ * There is also a field for a nanosecond-resolution time stamp which records the
+ * time that this file was first written in each cycle.
+ *
+ * This header precedes all data in journal files and occupies the first complete
+ * block in the file. The fields in this record are updated as necessary each time
+ * the file is written or overwritten.
+ *
+ * File layout in binary format (48 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x00 | m_magic | v | e | m_flags | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader
+ * 0x08 | m_recordId (used to show first rid in file) | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x10 | m_physicalFileId | m_logicalFileId |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x18 | m_firstRecordOffset |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x20 | m_timestampSeconds |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x28 |m_timestampNanoSeconds | m_reserved |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ * <table>
+ * <tr>
+ * <td>v</td>
+ * <td>file version [ <code>_version</code> ] (If the format or encoding of
+ * this file changes, then this number should be incremented)</td>
+ * </tr>
+ * <tr>
+ * <td>e</td>
+ * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+ * little endian, <b>true</b> (0x01) for big endian</td>
+ * </tr>
+ * </table>
+ * General journal file structure:
+ * <pre>
+ * <----------------------------- Journal File ----------------------------------->
+ * +--------+ +-----------------+ +---------+ +---------+ +-------------------+
+ * | file | | continuation of | | first | | second | ... | last record |
+ * | header | | record from | | record | | record | | (or part thereof) |
+ * | | | prev. file (opt)| | in file | | in file | | in file |
+ * +--------+ +-----------------+ +---------+ +---------+ +-------------------+
+ * ^
+ * |
+ * m_firstRecordOffset -----------+
+ * </pre>
+ */
+class FileHeader : public RecordHeader
+{
+public:
+ uint32_t m_physicalFileId; ///< Physical file ID (pfid)
+ uint32_t m_logicalFileId; ///< Logical file ID (lfid)
+ uint64_t m_firstRecordOffset;///< First record offset (fro)
+ uint64_t m_timestampSeconds; ///< Timestamp of journal initialization, seconds component
+ uint32_t m_timestampNanoSeconds;///< Timestamp of journal initialization, nanoseconds component
+ uint32_t m_reserved; ///< Little-endian filler for uint32_t
+
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ FileHeader();
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ * \param magic Magic for this record
+ * \param version Version of this record
+ * \param recordId RecordId for this record
+ * \param overwriteIndicator Overwrite indicator for this record
+ * \param physicalFileId Physical file ID (file number on disk)
+ * \param logicalFileId Logical file ID (file number as seen by circular file buffer)
+ * \param firstRecordOffset First record offset in bytes from beginning of file
+ * \param setTimestampFlag If true, causes the timestamp to be initialized with the current system time
+ */
+ FileHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const bool overwriteIndicator,
+ const uint16_t physicalFileId,
+ const uint16_t logicalFileId,
+ const uint64_t firstRecordOffset,
+ const bool setTimestampFlag = false);
+
+ /**
+ * \brief Copy constructor.
+ * \param fh FileHeader instance to be copied
+ */
+ FileHeader(const FileHeader& fh);
+
+ /**
+ * \brief Destructor.
+ */
+ virtual ~FileHeader();
+
+ /**
+ * \brief Convenience copy method.
+ * \param fh FileHeader instance to be copied
+ */
+ void copy(const FileHeader& fh);
+
+ /**
+ * \brief Resets all fields to default values (mostly 0).
+ */
+ void reset();
+
+ /**
+ * \brief Return the header size of this record in bytes.
+ * \returns Size of record header in bytes.
+ */
+ static uint64_t getHeaderSize();
+
+ /**
+ * \brief Return the body (data) size of this record in bytes, which in the case of a FileHeader is always 0.
+ * \returns Size of record body in bytes. By definition, a FileHeader has no body.
+ */
+ uint64_t getBodySize() const;
+
+ /**
+ * \brief Return total size of this record in bytes, being in the case of the
+ * FileHeader the size of the header itself only.
+ * \returns Total size of record in bytes.
+ */
+ uint64_t getRecordSize() const;
+
+ /**
+ * \brief Gets the current time from the system clock and sets the timestamp in the struct.
+ */
+ void setTimestamp();
+
+ /**
+ * \brief Sets the timestamp in the struct to the provided value (in seconds and nanoseconds).
+ * \param ts Timestamp from which the file header time stamp is to be copied
+ */
+ void setTimestamp(const timespec& ts);
+
+};
+
+#pragma pack()
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_FileHeader_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.cpp b/cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.cpp
new file mode 100644
index 0000000000..0ec576805d
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.cpp
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file JournalDirectory.cpp
+ */
+#include "JournalDirectory.h"
+
+#include "JournalError.h"
+
+#include <cstring>
+#include <dirent.h>
+#include <errno.h>
+#include <sstream>
+#include <unistd.h>
+
+#include <sys/stat.h>
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+JournalDirectory::JournalDirectory(const std::string& fqName) :
+ m_fqName(fqName),
+ m_verified(false)
+{}
+
+const
+std::string JournalDirectory::getFqName() const
+{
+ return m_fqName;
+}
+
+void
+JournalDirectory::setFqName(const std::string newFqName,
+ const bool createNew,
+ const bool destroyExisting)
+{
+ if (m_fqName.compare(newFqName) != 0) {
+ if (destroyExisting) {
+ destroy();
+ }
+ m_fqName = newFqName;
+ if (createNew) {
+ create();
+ }
+ }
+}
+
+// static
+bool
+JournalDirectory::s_exists(const std::string& fqName,
+ const bool checkIsWritable)
+{
+ struct stat buff;
+ if (::lstat(fqName.c_str(), &buff)) {
+ if (errno == ENOENT) // No such dir or file
+ return false;
+ HANDLE_SYS_ERROR(fqName, JournalError::JERR_STAT, "s_exists");
+ }
+ CHK_ERROR(!S_ISDIR(buff.st_mode), fqName, JournalError::JERR_NOTADIR, "s_exists");
+ if (checkIsWritable) {
+ return (buff.st_mode & (S_IWUSR | S_IWGRP | S_IWOTH));
+ }
+ return true;
+}
+
+// static
+void
+JournalDirectory::s_create(const std::string& fqName)
+{
+ std::size_t fdp = fqName.find_last_of('/');
+ if (fdp != std::string::npos) {
+ std::string parent_dir = fqName.substr(0, fdp);
+ if (!s_exists(parent_dir)) {
+ s_create(parent_dir);
+ }
+ }
+ if (::mkdir(fqName.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) {
+ if (errno != EEXIST) { // Dir exists, ignore
+ HANDLE_SYS_ERROR(fqName, JournalError::JERR_MKDIR, "s_create");
+ }
+ }
+}
+
+void
+JournalDirectory::create()
+{
+ s_create(m_fqName);
+ m_verified = true;
+}
+
+//static
+void
+JournalDirectory::s_clear(const std::string& fqName,
+ const bool recursiveDelete)
+{
+ s_destroy(fqName, recursiveDelete, true);
+}
+
+void
+JournalDirectory::clear(const bool recursiveDelete)
+{
+ s_clear(m_fqName, recursiveDelete);
+}
+
+// static
+void
+JournalDirectory::s_destroy(const std::string& fqName,
+ const bool recursiveDelete,
+ const bool childrenOnly)
+{
+ if (s_exists(fqName)) {
+ DIR* dir = ::opendir(fqName.c_str());
+ if (dir) {
+ struct stat buff;
+ struct dirent* entry;
+ while ((entry = ::readdir(dir)) != 0) {
+ // Ignore . and ..
+ if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0) {
+ std::string fullName(fqName + "/" + entry->d_name);
+ CHK_SYS_ERROR(::lstat(fullName.c_str(), &buff), fullName, JournalError::JERR_STAT, "s_destroy");
+ if (S_ISREG(buff.st_mode) || S_ISLNK(buff.st_mode)) { // This is a file or symlink
+ CHK_SYS_ERROR(::unlink(fullName.c_str()), fullName, JournalError::JERR_UNLINK, "s_destroy");
+ } else if (S_ISDIR(buff.st_mode)) { // This is a directory
+ if (recursiveDelete) {
+ s_destroy(fullName);
+ } else {
+ HANDLE_ERROR(fullName, JournalError::JERR_DIRNOTEMPTY, "s_destroy");
+ }
+ } else {
+ HANDLE_ERROR(fullName, JournalError::JERR_BADFTYPE, "s_destroy");
+ }
+ }
+ }
+ CHK_SYS_ERROR(::closedir(dir), fqName, JournalError::JERR_CLOSEDIR, "s_destroy");
+ if (!childrenOnly) {
+ CHK_SYS_ERROR(::rmdir(fqName.c_str()), fqName, JournalError::JERR_RMDIR, "s_destroy");
+ }
+ } else {
+ HANDLE_SYS_ERROR(fqName, JournalError::JERR_OPENDIR, "s_destroy");
+ }
+ }
+}
+
+void
+JournalDirectory::destroy(const bool recursiveDelete,
+ const bool childrenOnly)
+{
+ if (m_verified) {
+ s_destroy(m_fqName, recursiveDelete, childrenOnly);
+ m_verified = false;
+ }
+}
+
+bool
+JournalDirectory::isVerified() const
+{
+ return m_verified;
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.h b/cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.h
new file mode 100644
index 0000000000..444094ccb4
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.h
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file JournalDirectory.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_JournalDirectory_h_
+#define qpid_asyncStore_jrnl2_JournalDirectory_h_
+
+#include <string>
+
+// --- Helper macros ---
+
+/**
+ * \brief Macro to handle the throwing of JournalExceptions.
+ *
+ * \param name Directory name
+ * \param err Static JournalError instance corresponding to the error to be thrown.
+ * \param fn C-string conaining throwing function name
+ */
+#define HANDLE_ERROR(name, err, fn) \
+ std::ostringstream oss; \
+ oss << "name=\"" << name << "\""; \
+ throw qpid::asyncStore::jrnl2::JournalError(err, oss.str(), "JournalDirectory", fn)
+
+/**
+ * \brief Macro to both check a result \a ret for a non-zero value and then throw a JournalException if so.
+ *
+ * \param ret Int result value to be checked for non-zero value.
+ * \param name Directory name.
+ * \param err Static JournalError instance corresponding to the error to be thrown.
+ * \param fn C-string conaining throwing function name
+ */
+#define CHK_ERROR(ret, name, err, fn) \
+ if (ret) { HANDLE_ERROR(name, err, fn); }
+
+/**
+ * \brief Macro to handle the throwing of JournalException in which errno contains the error code.
+ *
+ * \param name Directory name
+ * \param err Static JournalError instance corresponding to the error to be thrown.
+ * \param fn C-string conaining throwing function name
+ */
+#define HANDLE_SYS_ERROR(name, err, fn) \
+ std::ostringstream oss; \
+ oss << "name=\"" << name << "\": " << FORMAT_SYSERR(errno); \
+ throw qpid::asyncStore::jrnl2::JournalError(err, oss.str(), "JournalDirectory", fn)
+
+/**
+ * \brief Macro to both check a result \a ret for a non-zero value, then throw a JournalException if so. The value
+ * of errno is read and incorporated into the error text.
+ *
+ * \param ret Int result value to be checked for non-zero value.
+ * \param name Directory name
+ * \param err Static JournalError instance corresponding to the error to be thrown.
+ * \param fn C-string conaining throwing function name
+ */
+#define CHK_SYS_ERROR(ret, name, err, fn) \
+ if (ret) { HANDLE_SYS_ERROR(name, err, fn); }
+
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+/**
+ * \brief Class which manages and controls the directory where a journal instance will be created and operated.
+ *
+ * This class can check for the existence of the target directory, and if it is not present, create it. It can
+ * also delete the directory (with all its contents).
+ */
+class JournalDirectory
+{
+public:
+ /**
+ * \brief Constructor.
+ *
+ * \param fqName Name of directory to be checked for. May be an absolute or relative path.
+ * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails.
+ * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory.
+ * \throws JournalException with error JournalErrors::JERR_UNLINK if ::unlink() fails
+ * \throws JournalException with error JournalErrors::JERR_DIRNOTEMPTY if \a recursiveDelete is \b false and
+ * directory \a name contains one or more directories.
+ * \throws JournalException with error JournalErrors::JERR_BADFTYPE if \a name is not a regular file, symlink
+ * or directory.
+ * \throws JournalException with error JournalErrors::JERR_CLOSEDIR if ::closedir() fails.
+ * \throws JournalException with error JournalErrors::JERR_RMDIR if ::rmdir() fails.
+ * \throws JournalException with error JournalErrors::JERR_OPENDIR if ::opendir() fails.
+ */
+ JournalDirectory(const std::string& fqName);
+
+ /**
+ * \brief Returns the name of the journal directory.
+ *
+ * \returns Name of the joruanl directory.
+ */
+ const std::string getFqName() const;
+
+ /**
+ * \brief Set a (new) journal directory location. The previous location is assumed to have been set in the
+ * constructor or a previous call to getName().
+ *
+ * \param newFqName New directory (fully qualified name)
+ * \param createNew If \b true, will create the new directory immediately and setting m_verified to \b true,
+ * otherwise the name will be set, but no action taken to create or verify the name.
+ * \param destroyExisting If \b true, will destroy the old journal directory, deleting all its contents. If
+ * \b false, the existing directory will be left in tact.
+ * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails.
+ * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory.
+ * \throws JournalException with error JournalErrors::JERR_UNLINK if ::unlink() fails
+ * \throws JournalException with error JournalErrors::JERR_BADFTYPE if \a name is not a regular file, symlink
+ * or directory.
+ * \throws JournalException with error JournalErrors::JERR_CLOSEDIR if ::closedir() fails.
+ * \throws JournalException with error JournalErrors::JERR_RMDIR if ::rmdir() fails.
+ * \throws JournalException with error JournalErrors::JERR_OPENDIR if ::opendir() fails.
+ */
+ void setFqName(const std::string newFqName,
+ const bool createNew = true,
+ const bool destroyExisting = false);
+
+ /**
+ * \brief Static helper class to check for the existence of the directory \a name.
+ *
+ * \param fqName Name of directory to be checked for. May be an absolute or relative path.
+ * \param checkIsWritable Also check if the directory is writable. If the directory exists but is not writable
+ * and this parameter is set \b true, then this test will fail.
+ * \returns \b true if the directory exists and if \b checkIsWritable is set to \b true, then also writable,
+ * \b false otherwise.
+ * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails.
+ * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory.
+ */
+ static bool s_exists(const std::string& fqName,
+ const bool checkIsWritable = true);
+
+ /**
+ * \brief Static helper function to create a directory \a name. If parameter \a name is a path which includes
+ * one or more non-existent directories, then these will be recursively created as needed.
+ *
+ * \param fqName Name of directory to be created. May be an absolute or relative path.
+ * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails.
+ * \throws JournalException with error JournalErrors::JERR_NOTADIR if part of \a name exists, but is not a
+ * directory.
+ * \throws JournalException with error JournalErrors::JERR_MKDIR if ::mkdir() fails.
+ */
+ static void s_create(const std::string& fqName);
+
+ /**
+ * \brief Create the directory \a name supplied to the constructor.
+ *
+ * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails.
+ * \throws JournalException with error JournalErrors::JERR_NOTADIR if part of \a name exists, but is not a
+ * directory.
+ * \throws JournalException with error JournalErrors::JERR_MKDIR if ::mkdir() fails.
+ */
+ void create();
+
+ /**
+ * \brief Static helper function to delete only the contents of directory \a name.
+ *
+ * \param fqName Name of directory whose contents are to be deleted. May be an absolute or relative path.
+ * \param recursiveDelete If \b true, then all subdirectories (if they exist) will also be deleted.
+ * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails.
+ * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory.
+ * \throws JournalException with error JournalErrors::JERR_UNLINK if ::unlink() fails
+ * \throws JournalException with error JournalErrors::JERR_DIRNOTEMPTY if \a recursiveDelete is \b false and
+ * directory \a name contains one or more directories.
+ * \throws JournalException with error JournalErrors::JERR_BADFTYPE if \a name is not a regular file, symlink
+ * or directory.
+ * \throws JournalException with error JournalErrors::JERR_CLOSEDIR if ::closedir() fails.
+ * \throws JournalException with error JournalErrors::JERR_RMDIR if ::rmdir() fails.
+ * \throws JournalException with error JournalErrors::JERR_OPENDIR if ::opendir() fails.
+ */
+ static void s_clear(const std::string& fqName,
+ const bool recursiveDelete = true);
+
+ /**
+ * \brief Delete only the contents of directory \a name.
+ *
+ * \param recursiveDelete If \b true, then all subdirectories (if they exist) will also be deleted.
+ * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails.
+ * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory.
+ * \throws JournalException with error JournalErrors::JERR_UNLINK if ::unlink() fails
+ * \throws JournalException with error JournalErrors::JERR_DIRNOTEMPTY if \a recursiveDelete is \b false and
+ * directory \a name contains one or more directories.
+ * \throws JournalException with error JournalErrors::JERR_BADFTYPE if \a name is not a regular file, symlink
+ * or directory.
+ * \throws JournalException with error JournalErrors::JERR_CLOSEDIR if ::closedir() fails.
+ * \throws JournalException with error JournalErrors::JERR_RMDIR if ::rmdir() fails.
+ * \throws JournalException with error JournalErrors::JERR_OPENDIR if ::opendir() fails.
+ */
+ void clear(const bool recursiveDelete = true);
+
+ /**
+ * \brief Static helper function to delete the contents of directory \a name, and by default, also the
+ * directory itself.
+ *
+ * \param fqName Name of directory to be deleted. May be an absolute or relative path.
+ * \param recursiveDelete If \b true, then all subdirectories (if they exist) will also be deleted.
+ * \param childrenOnly If \b true, then only the contents of directory \a name will be deleted and directory
+ * \a name will remain. Otherwise \a name itself will also be deleted.
+ * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails.
+ * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory.
+ * \throws JournalException with error JournalErrors::JERR_UNLINK if ::unlink() fails
+ * \throws JournalException with error JournalErrors::JERR_DIRNOTEMPTY if \a recursiveDelete is \b false and
+ * directory \a name contains one or more directories.
+ * \throws JournalException with error JournalErrors::JERR_BADFTYPE if \a name is not a regular file, symlink
+ * or directory.
+ * \throws JournalException with error JournalErrors::JERR_CLOSEDIR if ::closedir() fails.
+ * \throws JournalException with error JournalErrors::JERR_RMDIR if ::rmdir() fails.
+ * \throws JournalException with error JournalErrors::JERR_OPENDIR if ::opendir() fails.
+ */
+ static void s_destroy(const std::string& fqName,
+ const bool recursiveDelete = true,
+ const bool childrenOnly = false);
+
+ /**
+ * \brief Delete the contents of directory \a name, and by default, also the directory itself.
+ *
+ * \param recursiveDelete If \b true, then all subdirectories (if they exist) will also be deleted.
+ * \param childrenOnly If \b true, then only the contents of directory \a name will be deleted and directory
+ * \a name will remain. Otherwise \a name itself will also be deleted.
+ * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails.
+ * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory.
+ * \throws JournalException with error JournalErrors::JERR_UNLINK if ::unlink() fails
+ * \throws JournalException with error JournalErrors::JERR_DIRNOTEMPTY if \a recursiveDelete is \b false and
+ * directory \a name contains one or more directories.
+ * \throws JournalException with error JournalErrors::JERR_BADFTYPE if \a name is not a regular file, symlink
+ * or directory.
+ * \throws JournalException with error JournalErrors::JERR_CLOSEDIR if ::closedir() fails.
+ * \throws JournalException with error JournalErrors::JERR_RMDIR if ::rmdir() fails.
+ * \throws JournalException with error JournalErrors::JERR_OPENDIR if ::opendir() fails.
+ */
+ void destroy(const bool recursiveDelete = true,
+ const bool childrenOnly = false);
+
+ /**
+ * \brief Return the verified status of the store directory.
+ *
+ * \returns \b true if the directory exists and is available for use as a journal directory, \b false otherwise.
+ */
+ bool isVerified() const;
+
+protected:
+ std::string m_fqName; ///< Name of directory (fully qualified name)
+ bool m_verified; ///< True when verified that it exists or is created
+
+};
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_JournalDirectory_h_
+
diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalError.cpp b/cpp/src/qpid/asyncStore/jrnl2/JournalError.cpp
new file mode 100644
index 0000000000..3c64148b5b
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/JournalError.cpp
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file JournalError.cpp
+ */
+
+#include "JournalError.h"
+
+#include <iomanip> // std::setfill(), std::setw()
+#include <sstream> // std::ostringstream
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+JournalError::JournalError() throw () :
+ std::runtime_error(std::string()),
+ m_errorCode(0)
+{
+ formatWhatStr();
+}
+
+JournalError::JournalError(const uint32_t errorCode) throw () :
+ std::runtime_error(std::string()),
+ m_errorCode(errorCode)
+{
+ formatWhatStr();
+}
+
+JournalError::JournalError(const char* additionalInfo) throw () :
+ std::runtime_error(std::string()),
+ m_errorCode(0),
+ m_additionalInfo(additionalInfo)
+{
+ formatWhatStr();
+}
+
+JournalError::JournalError(const std::string& additionalInfo) throw () :
+ std::runtime_error(std::string()),
+ m_errorCode(0),
+ m_additionalInfo(additionalInfo)
+{
+ formatWhatStr();
+}
+
+JournalError::JournalError(const uint32_t errorCode,
+ const char* additionalInfo) throw () :
+ std::runtime_error(std::string()),
+ m_errorCode(errorCode),
+ m_additionalInfo(additionalInfo)
+{
+ formatWhatStr();
+}
+
+JournalError::JournalError(const uint32_t errorCode,
+ const std::string& additionalInfo) throw () :
+ std::runtime_error(std::string()),
+ m_errorCode(errorCode),
+ m_additionalInfo(additionalInfo)
+{
+ formatWhatStr();
+}
+
+JournalError::JournalError(const uint32_t errorCode,
+ const char* throwingClass,
+ const char* throwingFunction) throw () :
+ std::runtime_error(std::string()),
+ m_errorCode(errorCode),
+ m_throwingClass(throwingClass),
+ m_throwingFunction(throwingFunction)
+{
+ formatWhatStr();
+}
+
+JournalError::JournalError(const uint32_t errorCode,
+ const std::string& throwingClass,
+ const std::string& throwingFunction) throw () :
+ std::runtime_error(std::string()),
+ m_errorCode(errorCode),
+ m_throwingClass(throwingClass),
+ m_throwingFunction(throwingFunction)
+{
+ formatWhatStr();
+}
+
+JournalError::JournalError(const uint32_t errorCode,
+ const char* additionalInfo,
+ const char* throwingClass,
+ const char* throwingFunction) throw () :
+ std::runtime_error(std::string()),
+ m_errorCode(errorCode),
+ m_additionalInfo(additionalInfo),
+ m_throwingClass(throwingClass),
+ m_throwingFunction(throwingFunction)
+{
+ formatWhatStr();
+}
+
+JournalError::JournalError(const uint32_t errorCode,
+ const std::string& additionalInfo,
+ const std::string& throwingClass,
+ const std::string& throwingFunction) throw () :
+ std::runtime_error(std::string()),
+ m_errorCode(errorCode),
+ m_additionalInfo(additionalInfo),
+ m_throwingClass(throwingClass),
+ m_throwingFunction(throwingFunction)
+{
+ formatWhatStr();
+}
+
+JournalError::~JournalError() throw ()
+{}
+
+const char*
+JournalError::what() const throw ()
+{
+ return m_what.c_str();
+}
+
+uint32_t
+JournalError::getErrorCode() const throw ()
+{
+ return m_errorCode;
+}
+
+const std::string
+JournalError::getAdditionalInfo() const throw ()
+{
+ return m_additionalInfo;
+}
+
+const std::string
+JournalError::getThrowingClass() const throw ()
+{
+ return m_throwingClass;
+}
+
+const std::string
+JournalError::getThrowingFunction() const throw ()
+{
+ return m_throwingFunction;
+}
+
+void
+JournalError::toStream(std::ostream& os) const
+{
+ os << what();
+}
+
+// protected
+void
+JournalError::formatWhatStr() throw ()
+{
+ try {
+ const bool ai = !m_additionalInfo.empty();
+ const bool tc = !m_throwingClass.empty();
+ const bool tf = !m_throwingFunction.empty();
+ std::ostringstream oss;
+ oss << className() << " 0x" << std::hex << std::setfill('0') << std::setw(4) << m_errorCode << " ";
+ if (tc) {
+ oss << m_throwingClass;
+ if (tf) {
+ oss << "::";
+ } else {
+ oss << " ";
+ }
+ }
+ if (tf) {
+ oss << m_throwingFunction << "() ";
+ }
+ if (tc || tf) {
+ oss << "threw " << s_errorMessage(m_errorCode);
+ }
+ if (ai) {
+ oss << " (" << m_additionalInfo << ")";
+ }
+ m_what.assign(oss.str());
+ } catch (...) {}
+}
+
+// protected
+const char*
+JournalError::className()
+{
+ return s_className;
+}
+
+// static error code definitions
+JournalError::errorMap_t JournalError::s_errorMap;
+JournalError::errorMapCitr_t JournalError::s_errorMapIterator;
+
+// generic and system errors
+const uint32_t JournalError::JERR_PTHREAD = 0x0001;
+const uint32_t JournalError::JERR_RTCLOCK = 0x0002;
+
+// illegal states
+const uint32_t JournalError::JERR_JRNLRUNSTATE = 0x0101;
+const uint32_t JournalError::JERR_MSGOPSTATE = 0x0102;
+const uint32_t JournalError::JERR_MSGWRCMPLSTATE = 0x0103;
+
+// file ops and file i/o
+const uint32_t JournalError::JERR_STAT = 0x0200;
+const uint32_t JournalError::JERR_OPENDIR = 0x0201;
+const uint32_t JournalError::JERR_CLOSEDIR = 0x0202;
+const uint32_t JournalError::JERR_MKDIR = 0x0203;
+const uint32_t JournalError::JERR_RMDIR = 0x0204;
+const uint32_t JournalError::JERR_UNLINK = 0x0205;
+const uint32_t JournalError::JERR_NOTADIR = 0x0206;
+const uint32_t JournalError::JERR_BADFTYPE = 0x0207;
+const uint32_t JournalError::JERR_DIRNOTEMPTY = 0x0208;
+
+// static
+const char*
+JournalError::s_errorMessage(const uint32_t err_no) throw ()
+{
+ s_errorMapIterator = s_errorMap.find(err_no);
+ if (s_errorMapIterator == s_errorMap.end())
+ return "<Unknown error code>";
+ return s_errorMapIterator->second;
+}
+
+// private static
+const char* JournalError::s_className = "qpid::asyncStore::jrnl2::Error";
+bool JournalError::s_initializedFlag = JournalError::s_initialize();
+
+// private static
+bool
+JournalError::s_initialize()
+{
+ s_errorMap[JERR_PTHREAD] = "JERR_PTHREAD: pthread operation failure";
+ s_errorMap[JERR_RTCLOCK] = "JERR_RTCLOCK: realtime clock operation failure";
+
+ s_errorMap[JERR_JRNLRUNSTATE] = "JERR_JRNLRUNSTATE: Illegal journal run state transition";
+ s_errorMap[JERR_MSGOPSTATE] = "JERR_MSGOPSTATE: Illegal msg op state transition";
+ s_errorMap[JERR_MSGWRCMPLSTATE] = "JERR_MSGWRCMPLSTATE: Illegal msg write completion state transition";
+
+ s_errorMap[JERR_STAT] = "JERR_STAT: Call to stat() (file status) failed";
+ s_errorMap[JERR_OPENDIR] = "JERR_OPENDIR: Call to opendir() failed";
+ s_errorMap[JERR_CLOSEDIR] = "JERR_CLOSEDIR: Call to closedir() failed";
+ s_errorMap[JERR_MKDIR] = "JERR_MKDIR: Call to mkdir() failed";
+ s_errorMap[JERR_RMDIR] = "JERR_RMDIR: Call to rmdir() failed";
+ s_errorMap[JERR_UNLINK] = "JERR_UNLINK: Call to unlink() (file delete) failed";
+ s_errorMap[JERR_NOTADIR] = "JERR_NOTADIR: Not a directory";
+ s_errorMap[JERR_BADFTYPE] = "JERR_BADFTYPE: Bad file type";
+ s_errorMap[JERR_DIRNOTEMPTY] = "JERR_DIRNOTEMPTY: Directory not empty";
+
+ return true;
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalError.h b/cpp/src/qpid/asyncStore/jrnl2/JournalError.h
new file mode 100644
index 0000000000..42f64ef794
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/JournalError.h
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file JournalError.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_JournalError_hpp_
+#define qpid_asyncStore_jrnl2_JournalError_hpp_
+
+#include "Streamable.h"
+
+#include <map>
+#include <stdexcept> // std::runtime_error
+#include <stdint.h> // uint32_t
+
+// Macro definitions
+
+/**
+ * \brief Macro to retrieve and format the C errno value as a string.
+ *
+ * \param errno Value of errno to be formatted.
+ */
+#define FORMAT_SYSERR(errno) " errno=" << errno << " (" << std::strerror(errno) << ")"
+
+/**
+ * \brief Macro to check for a clean pthread creation and throwing a JournalException with code JERR_PTHREAD if
+ * thread creation failed.
+ *
+ * \param err Value or errno.
+ * \param pfn Name of system call that failed.
+ * \param cls Name of class in which function failed.
+ * \param fn Name of class function that failed.
+ */
+#define PTHREAD_CHK(err, pfn, cls, fn) if(err != 0) { \
+ std::ostringstream oss; \
+ oss << pfn << " failed: " << FORMAT_SYSERR(err); \
+ throw qpid::asyncStore::jrnl2::JournalError(qpid::asyncStore::jrnl2::JournalError::JERR_PTHREAD, oss.str(), cls, fn); \
+ }
+
+/**
+ * \brief Macro for throwing state-machine errors related to invalid state transitions.
+ *
+ * \param jerrno Error number to be thrown.
+ * \param target_st State into which the state machine was attemting to move.
+ * \param curr_st Current state of the state machine (making transition illegal).
+ * \param cls Name of class in which failure occurred.
+ * \param fn Name of class function that failed.
+ */
+#define THROW_STATE_EXCEPTION(jerrno, target_st, curr_st, cls, fn) { \
+ std::ostringstream oss; \
+ oss << cls << "::" << fn << "() in state " << curr_st << " cannot be moved to state " << target_st << "."; \
+ throw qpid::asyncStore::jrnl2::JournalError(jerrno, oss.str(), cls, fn); \
+ }
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+/**
+ * \brief Universal error and exception class for the async store. The class uses a set of error codes to indicate
+ * the error or failure condition.
+ */
+class JournalError : public std::runtime_error, public Streamable
+{
+public:
+ /**
+ * \brief Default constructor.
+ */
+ JournalError() throw ();
+
+ /**
+ * \brief Constructor which accepts only the error code.
+ */
+ JournalError(const uint32_t errorCode) throw ();
+
+ /**
+ * \brief Constructor which accepts only additional info as a c-string.
+ */
+ JournalError(const char* additionalInfo) throw ();
+
+ /**
+ * \brief Constructor which accepts only additional info as a std::string.
+ */
+ JournalError(const std::string& additionalInfo) throw ();
+
+ /**
+ * \brief Constructor which accepts both the error code and additional info as a c-string.
+ */
+ JournalError(const uint32_t errorCode,
+ const char* additionalInfo) throw ();
+
+ /**
+ * \brief Constructor which accepts both the error code and additional info as a std::string.
+ */
+ JournalError(const uint32_t errorCode,
+ const std::string& additionalInfo) throw ();
+
+
+ /**
+ * \brief Constructor which accepts both the error code and the throwing class name as a c-string.
+ */
+ JournalError(const uint32_t errorCode,
+ const char* throwingClass,
+ const char* throwingFunction) throw ();
+
+ /**
+ * \brief Constructor which accepts both the error code and the throwing class name as a std::string.
+ */
+ JournalError(const uint32_t errorCode,
+ const std::string& throwingClass,
+ const std::string& throwingFunction) throw ();
+
+
+ /**
+ * \brief Constructor which accepts the error code, the throwing class and function as c-strings.
+ */
+ JournalError(const uint32_t errorCode,
+ const char* additionalInfo,
+ const char* throwingClass,
+ const char* throwingFunction) throw ();
+
+ /**
+ * \brief Constructor which accepts the error code, the throwing class and function as std::strings.
+ */
+ JournalError(const uint32_t errorCode,
+ const std::string& additionalInfo,
+ const std::string& throwingClass,
+ const std::string& throwingFunction) throw ();
+
+
+ /**
+ * \brief Destructor guaranteed not to throw.
+ */
+ virtual ~JournalError() throw ();
+
+ /**
+ * \brief Overloaded std::exception::what() call which returns a string representation of the error or fault.
+ *
+ * \returns C-string representation of the error or fault.
+ */
+ const char* what() const throw (); // override std::exception::what()
+
+ /**
+ * \brief Get the error or fault code.
+ *
+ * \returns The error or fault code.
+ */
+ uint32_t getErrorCode() const throw ();
+
+ /**
+ * \brief Get additional error or fault info from the \c m_additionalInfo field.
+ *
+ * \returns Additional error or fault info.
+ */
+ const std::string getAdditionalInfo() const throw ();
+
+ /**
+ * \brief Get name of throwing class from the \c m_throwingClass field.
+ *
+ * \returns Name of throwing class.
+ */
+ const std::string getThrowingClass() const throw ();
+
+ /**
+ * \brief Get name of throwing function from \c m_throwingFunction field.
+ *
+ * \returns Name of throwing function.
+ */
+ const std::string getThrowingFunction() const throw ();
+
+ // -- Implementation of Streamable methods ---
+
+ virtual void toStream(std::ostream&) const;
+
+ // --- Static error codes ---
+
+ // generic and system errors
+ static const uint32_t JERR_PTHREAD; ///< pthread operation failure
+ static const uint32_t JERR_RTCLOCK; ///< realtime clock operation failure
+
+ // illegal states
+ static const uint32_t JERR_JRNLRUNSTATE; ///< Illegal journal run state transition
+ static const uint32_t JERR_MSGOPSTATE; ///< Illegal msg op state transition
+ static const uint32_t JERR_MSGWRCMPLSTATE; ///< Illegal msg write completion state transition
+
+ // file ops and file i/o
+ static const uint32_t JERR_STAT; ///< Call to stat() or lstat() failed
+ static const uint32_t JERR_OPENDIR; ///< Call to opendir() failed
+ static const uint32_t JERR_CLOSEDIR; ///< Call to closedir() failed
+ static const uint32_t JERR_MKDIR; ///< Call to rmdir() failed
+ static const uint32_t JERR_RMDIR; ///< Call to rmdir() failed
+ static const uint32_t JERR_UNLINK; ///< Call to unlink() failed
+ static const uint32_t JERR_NOTADIR; ///< Not a directory
+ static const uint32_t JERR_BADFTYPE; ///< Bad file type
+ static const uint32_t JERR_DIRNOTEMPTY; ///< Directory not empty
+
+ /**
+ * \brief Static method which converts an error or exception code into a c-string representation.
+ *
+ * \param err_no Error code for which a string representation is desired.
+ * \returns C-string representation of the error code \c err_no.
+ */
+ static const char* s_errorMessage(const uint32_t err_no) throw ();
+
+protected:
+ uint32_t m_errorCode; ///< Error or failure code, taken from JournalErrors.
+ std::string m_additionalInfo; ///< Additional information pertaining to the error or failure.
+ std::string m_throwingClass; ///< Name of the class throwing the error.
+ std::string m_throwingFunction; ///< Name of the function throwing the error.
+ std::string m_what; ///< Standard error of failure message, taken from JournalErrors.
+
+ /**
+ * \brief Protected function to format the error message.
+ */
+ void formatWhatStr() throw ();
+
+ /**
+ * \brief Return this class name to print in error messages. Derived classes will have different definitions
+ * for s_className, and thus this function will return the correct name in any child class.
+ */
+ virtual const char* className();
+
+ typedef std::map<uint32_t, const char*> errorMap_t; ///< Type for map of error messages
+ typedef errorMap_t::const_iterator errorMapCitr_t; ///< Const iterator for map of error messages
+
+ static errorMap_t s_errorMap; ///< Map of error messages
+ static errorMapCitr_t s_errorMapIterator; ///< Const iterator
+
+private:
+ static const char* s_className; ///< Name of this class, used in formatting error messages.
+ static bool s_initializedFlag; ///< Dummy flag, used to initialize map.
+
+ static bool s_initialize(); ///< Static fn for initializing static data
+
+};
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_JournalError_hpp_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalParameters.cpp b/cpp/src/qpid/asyncStore/jrnl2/JournalParameters.cpp
new file mode 100644
index 0000000000..63417582bc
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/JournalParameters.cpp
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file JournalParameters.cpp
+ */
+
+#include "JournalParameters.h"
+
+#include <sstream>
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+// static declarations
+std::string JournalParameters::s_defaultJrnlDir = "/tmp/store";
+std::string JournalParameters::s_defaultJrnlBaseFileName = "JournalData";
+uint16_t JournalParameters::s_defaultNumJrnlFiles = 8;
+uint32_t JournalParameters::s_defaultJrnlFileSize_sblks = 3072;
+uint16_t JournalParameters::s_defaultWriteBuffNumPgs = 32;
+uint32_t JournalParameters::s_defaultWriteBuffPgSize_sblks = 128;
+
+JournalParameters::JournalParameters() :
+ m_jrnlDir(s_defaultJrnlDir),
+ m_jrnlBaseFileName(s_defaultJrnlBaseFileName),
+ m_numJrnlFiles(s_defaultNumJrnlFiles),
+ m_jrnlFileSize_sblks(s_defaultJrnlFileSize_sblks),
+ m_writeBuffNumPgs(s_defaultWriteBuffNumPgs),
+ m_writeBuffPgSize_sblks(s_defaultWriteBuffPgSize_sblks)
+{}
+
+JournalParameters::JournalParameters(const std::string& jrnlDir,
+ const std::string& jrnlBaseFileName,
+ const uint16_t numJrnlFiles,
+ const uint32_t jrnlFileSize_sblks,
+ const uint16_t writeBuffNumPgs,
+ const uint32_t writeBuffPgSize_sblks) :
+ m_jrnlDir(jrnlDir),
+ m_jrnlBaseFileName(jrnlBaseFileName),
+ m_numJrnlFiles(numJrnlFiles),
+ m_jrnlFileSize_sblks(jrnlFileSize_sblks),
+ m_writeBuffNumPgs(writeBuffNumPgs),
+ m_writeBuffPgSize_sblks(writeBuffPgSize_sblks)
+{}
+
+JournalParameters::JournalParameters(const JournalParameters& sp) :
+ m_jrnlDir(sp.m_jrnlDir),
+ m_jrnlBaseFileName(sp.m_jrnlBaseFileName),
+ m_numJrnlFiles(sp.m_numJrnlFiles),
+ m_jrnlFileSize_sblks(sp.m_jrnlFileSize_sblks),
+ m_writeBuffNumPgs(sp.m_writeBuffNumPgs),
+ m_writeBuffPgSize_sblks(sp.m_writeBuffPgSize_sblks)
+{}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalParameters.h b/cpp/src/qpid/asyncStore/jrnl2/JournalParameters.h
new file mode 100644
index 0000000000..056c627d1c
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/JournalParameters.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file JournalParameters.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_JournalParameters_h_
+#define qpid_asyncStore_jrnl2_JournalParameters_h_
+
+#include <string>
+#include <stdint.h> // uint16_t, uint32_t
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+/**
+ * \brief Class which encapsulates journal settings and options for one AsyncJournal instance.
+ *
+ * This class is a convenience encapsulation of a multitude of per-journal settings. An instance of this class
+ * cannot be shared between journal instances as there must be either a different journal directory or a different
+ * base file name (or both).
+ */
+class JournalParameters
+{
+public:
+ // static default store params
+ static std::string s_defaultJrnlDir; ///< Default journal directory
+ static std::string s_defaultJrnlBaseFileName; ///< Default base file name for all journal files
+ static uint16_t s_defaultNumJrnlFiles; ///< Default number of journal files
+ static uint32_t s_defaultJrnlFileSize_sblks; ///< Default journal file size (in sblks)
+ static uint16_t s_defaultWriteBuffNumPgs; ///< Default number of write buffer pages
+ static uint32_t s_defaultWriteBuffPgSize_sblks; ///< Default write buffer page size (in sblks)
+
+ std::string m_jrnlDir; ///< Journal directory
+ std::string m_jrnlBaseFileName; ///< Base file name for all journal files
+ uint16_t m_numJrnlFiles; ///< Number of journal files
+ uint32_t m_jrnlFileSize_sblks; ///< Journal file size (in sblks)
+ uint16_t m_writeBuffNumPgs; ///< Number of write buffer pages
+ uint32_t m_writeBuffPgSize_sblks; ///< Write buffer page size (in dblks)
+
+ /**
+ * \brief Default constructor. This will set all members to their default settings.
+ */
+ JournalParameters();
+
+ /**
+ * \brief Explicit constructor, in which all settings must be supplied.
+ *
+ * \param jrnlDir Journal directory
+ * \param jrnlBaseFileName Base file name for all journal files
+ * \param numJrnlFiles Number of journal files
+ * \param jrnlFileSize_sblks Journal file size (in sblks)
+ * \param writeBuffNumPgs Number of write buffer pages
+ * \param writeBuffPgSize_sblks Write buffer page size (in dblks)
+ */
+ JournalParameters(const std::string& jrnlDir,
+ const std::string& jrnlBaseFileName,
+ const uint16_t numJrnlFiles,
+ const uint32_t jrnlFileSize_sblks,
+ const uint16_t writeBuffNumPgs,
+ const uint32_t writeBuffPgSize_sblks);
+
+ /**
+ * \brief Copy constructor.
+ *
+ * \param sp Ref to JournalParameters instance to be copied.
+ */
+ JournalParameters(const JournalParameters& sp);
+
+};
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_JournalParameters_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalRunState.cpp b/cpp/src/qpid/asyncStore/jrnl2/JournalRunState.cpp
new file mode 100644
index 0000000000..067ed12adf
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/JournalRunState.cpp
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file JournalRunState.cpp
+ */
+
+#include "JournalRunState.h"
+
+#include "JournalError.h"
+
+#include <sstream>
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+JournalRunState::JournalRunState() :
+ State<journalState_t>()
+{}
+
+JournalRunState::JournalRunState(const JournalRunState& s) :
+ State<journalState_t>(s)
+{}
+
+JournalRunState::JournalRunState(const journalState_t s) :
+ State<journalState_t>(s)
+{}
+
+JournalRunState::~JournalRunState()
+{}
+
+void
+JournalRunState::initialize()
+{
+ set(JS_INITIALIZING);
+}
+
+void
+JournalRunState::recoverPhase1()
+{
+ set(JS_RECOVERING_PHASE_1);
+}
+
+void
+JournalRunState::recoverPhase2()
+{
+ set(JS_RECOVERING_PHASE_2);
+}
+
+void
+JournalRunState::run()
+{
+ set(JS_RUNNING);
+}
+
+void
+JournalRunState::stop()
+{
+ set(JS_STOPPING);
+}
+
+void
+JournalRunState::stopped()
+{
+ set(JS_STOPPED);
+}
+
+const char*
+JournalRunState::getAsStr() const
+{
+ return s_toStr(m_state);
+}
+
+// static
+const char*
+JournalRunState::s_toStr(const journalState_t s)
+{
+ switch (s) {
+ case JS_NONE:
+ return "JS_NONE";
+ case JS_RECOVERING_PHASE_1:
+ return "JS_RECOVERING_PHASE_1";
+ case JS_RECOVERING_PHASE_2:
+ return "JS_RECOVERING_PHASE_2";
+ case JS_INITIALIZING:
+ return "JS_INITIALIZING";
+ case JS_RUNNING:
+ return "JS_RUNNING";
+ case JS_STOPPING:
+ return "JS_STOPPING";
+ case JS_STOPPED:
+ return "JS_STOPPED";
+ default:
+ std::ostringstream oss;
+ oss << "<unknown state (" << "s" << ")>";
+ return oss.str().c_str();
+ }
+}
+
+// protected
+void
+JournalRunState::set(const journalState_t s)
+{
+ // State transition logic: set stateError to true if an invalid transition is attempted
+ bool stateTransitionError = false;
+ switch(m_state) {
+ case JS_NONE:
+ if (s != JS_RECOVERING_PHASE_1 && s != JS_INITIALIZING) stateTransitionError = true;
+ break;
+ case JS_RECOVERING_PHASE_1:
+ if (s != JS_RECOVERING_PHASE_2) stateTransitionError = true;
+ break;
+ case JS_RECOVERING_PHASE_2:
+ case JS_INITIALIZING:
+ if (s != JS_RUNNING) stateTransitionError = true;
+ break;
+ case JS_RUNNING:
+ if (s != JS_STOPPING) stateTransitionError = true;
+ break;
+ case JS_STOPPING:
+ if (s != JS_STOPPED) stateTransitionError = true;
+ break;
+ case JS_STOPPED: // Cannot move out of this state except with reset()
+ stateTransitionError = true;
+ break;
+ }
+ if (stateTransitionError) {
+ THROW_STATE_EXCEPTION(JournalError::JERR_JRNLRUNSTATE, s_toStr(s), s_toStr(m_state),
+ "JournalState", "set");
+ }
+ m_state = s;
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalRunState.h b/cpp/src/qpid/asyncStore/jrnl2/JournalRunState.h
new file mode 100644
index 0000000000..85141d8e18
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/JournalRunState.h
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file JournalRunState.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_JournalRunState_h_
+#define qpid_asyncStore_jrnl2_JournalRunState_h_
+
+#include "State.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+/**
+ * \brief Enumeration of valid AsyncJournal states.
+ */
+typedef enum {
+ JS_NONE = 0, ///< Not initialized, not capable of running (This is the default/reset state).
+ JS_RECOVERING_PHASE_1, ///< Recovery phase 1 (analyzing and restoring AsyncJournal state from journal files).
+ JS_RECOVERING_PHASE_2, ///< Recovery phase 2 (recovering message content and restoring messages in the broker).
+ JS_INITIALIZING, ///< Initialization of a new journal, including preparing journal files.
+ JS_RUNNING, ///< Initialization or recovery complete, journal running and accepting content.
+ JS_STOPPING, ///< Journal stopping. No new requests for service accepted, waiting for internal state to become consistent.
+ JS_STOPPED ///< Journal stopped. No new requests for service accepted, store in a consistent state.
+} journalState_t;
+
+/**
+ * \brief State manager for the state of an AsyncJournal instance.
+ *
+ * The AsyncJournal has the following states and transitions:
+ * \dot
+ * digraph journalState_t {
+ * node [fontname=Helvetica, fontsize=8];
+ * JS_NONE [URL="\ref JS_NONE"]
+ * JS_RECOVERING_PHASE_1 [URL="\ref JS_RECOVERING_PHASE_1"]
+ * JS_RECOVERING_PHASE_2 [URL="\ref JS_RECOVERING_PHASE_2"]
+ * JS_INITIALIZING [URL="\ref JS_INITIALIZING"]
+ * JS_RUNNING [URL="\ref JS_RUNNING"]
+ * JS_STOPPING [URL="\ref JS_STOPPING"]
+ * JS_STOPPED [URL="\ref JS_STOPPED"]
+ *
+ * edge [fontname=Helvetica, fontsize=8]
+ * JS_NONE->JS_INITIALIZING [label="initialize()" URL="\ref qpid::jrnl2::JournalRunState::initialize()"]
+ * JS_INITIALIZING->JS_RUNNING [label="run()" URL="\ref qpid::jrnl2::JournalRunState::run()"]
+ * JS_NONE->JS_RECOVERING_PHASE_1 [label="recoverPhase1()" URL="\ref qpid::jrnl2::JournalRunState::recoverPhase1()"]
+ * JS_RECOVERING_PHASE_1->JS_RECOVERING_PHASE_2 [label="recoverPhase2()" URL="\ref qpid::jrnl2::JournalRunState::recoverPhase2()"]
+ * JS_RECOVERING_PHASE_2->JS_RUNNING [label="run()" URL="\ref qpid::jrnl2::JournalRunState::run()"]
+ * JS_RUNNING->JS_STOPPING [label="stop()" URL="\ref qpid::jrnl2::JournalRunState::stop()"]
+ * JS_STOPPING->JS_STOPPED [label="stopped()" URL="\ref qpid::jrnl2::JournalRunState::stopped()"]
+ * }
+ * \enddot
+ */
+class JournalRunState : public State<journalState_t>
+{
+public:
+ /**
+ * \brief Default constructor, setting internal state to JS_NONE.
+ */
+ JournalRunState();
+
+ /**
+ * \brief Constructor allowing the internal state to be preset to any valid value of journalState_t.
+ *
+ * \param s State value to which the internal state of this new instance should be initialized.
+ */
+ JournalRunState(const journalState_t s);
+
+ /**
+ * \brief Copy constructor.
+ * \param s Instance from which this new instance state should be copied.
+ */
+ JournalRunState(const JournalRunState& s);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~JournalRunState();
+
+ // State change functions
+
+ /**
+ * \brief Changes the journal state from JS_NONE to JS_INITIALIZING.
+ *
+ * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change
+ * to JS_INITIALIZING according to the state machine semantics.
+ */
+ void initialize();
+
+ /**
+ * \brief Changes the journal state from JS_NONE to JS_RECOVERING_PHASE_1.
+ * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change
+ * to JS_RECOVERING_PHASE_1 according to the state machine semantics.
+ */
+ void recoverPhase1();
+
+ /**
+ * \brief Changes the journal state from JS_RECOVERING_PHASE_1 to JS_RECOVERING_PHASE_2.
+ *
+ * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change
+ * to JS_RECOVERING_PHASE_2 according to the state machine semantics.
+ */
+ void recoverPhase2();
+
+ /**
+ * \brief Changes the journal state from JS_INITIALIZING or JS_RECOVERING_PHASE_2 to JS_RUNNING.
+ *
+ * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change
+ * to JS_RUNNING according to the state machine semantics.
+ */
+ void run();
+
+ /**
+ * \brief Changes the journal state from JS_RUNNING to JS_STOPPING.
+ *
+ * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change
+ * to JS_STOPPING according to the state machine semantics.
+ */
+ void stop();
+
+ /**
+ * \brief Changes the journal state from JS_STOPPING to JS_STOPPED.
+ *
+ * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change
+ * to JS_STOPPED according to the state machine semantics.
+ */
+ void stopped();
+
+ /**
+ * \brief Get the current state in a string form.
+ *
+ * \returns String representation of internal state m_state as returned by static function s_toStr().
+ */
+ const char* getAsStr() const;
+
+ /**
+ * \brief Static helper function to convert a numeric journalState_t type to a string representation.
+ */
+ static const char* s_toStr(const journalState_t s);
+
+protected:
+ /**
+ * \brief Set (or change) the value of m_state. This function implements the state machine checks for
+ * legal state change transitions, and throw an exception if an illegal state transition is requested.
+ *
+ * \param s State to which this machine should be changed.
+ * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change
+ * to the requested state \a s.
+ */
+ void set(const journalState_t s);
+
+};
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_JournalRunState_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/RecordHeader.cpp b/cpp/src/qpid/asyncStore/jrnl2/RecordHeader.cpp
new file mode 100644
index 0000000000..3317eb8438
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/RecordHeader.cpp
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecordHeader.cpp
+ */
+
+#include "RecordHeader.h"
+
+#include "Configuration.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+RecordHeader::RecordHeader() :
+ m_magic(0),
+ m_version(0),
+ m_bigEndianFlag(0),
+ m_flags(0),
+ m_recordId(0)
+{}
+
+RecordHeader::RecordHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const bool overwriteIndicator) :
+ m_magic(magic),
+ m_version(version),
+ m_bigEndianFlag(Configuration::s_endianValue),
+ m_flags(overwriteIndicator ? HDR_OVERWRITE_INDICATOR_MASK : 0),
+ m_recordId(recordId)
+{}
+
+RecordHeader::RecordHeader(const RecordHeader& rh) :
+ m_magic(rh.m_magic),
+ m_version(rh.m_version),
+ m_bigEndianFlag(rh.m_bigEndianFlag),
+ m_flags(rh.m_flags),
+ m_recordId(rh.m_recordId)
+{}
+
+RecordHeader::~RecordHeader()
+{}
+
+void
+RecordHeader::copy(const RecordHeader& rh)
+{
+ m_magic = rh.m_magic;
+ m_version = rh.m_version;
+ m_bigEndianFlag = rh.m_bigEndianFlag;
+ m_flags = rh.m_flags;
+ m_recordId = rh.m_recordId;
+}
+
+void
+RecordHeader::reset()
+{
+ m_magic = 0;
+ m_version = 0;
+ m_bigEndianFlag = 0;
+ m_flags = 0;
+ m_recordId = 0;
+}
+
+bool
+RecordHeader::getOverwriteIndicator() const
+{
+ return m_flags & HDR_OVERWRITE_INDICATOR_MASK;
+}
+
+void
+RecordHeader::setOverwriteIndicator(const bool owi)
+{
+ m_flags = owi ?
+ m_flags | HDR_OVERWRITE_INDICATOR_MASK :
+ m_flags & (~HDR_OVERWRITE_INDICATOR_MASK);
+}
+
+//static
+uint64_t
+RecordHeader::getHeaderSize()
+{
+ return static_cast<uint64_t>(sizeof(RecordHeader));
+}
+
+uint32_t
+RecordHeader::getCheckSum(uint32_t initialValue) const
+{
+ uint32_t cs = initialValue;
+ for (unsigned char* p = (unsigned char*)this;
+ p < (unsigned char*)this + getHeaderSize() + getBodySize();
+ p++) {
+ cs ^= (uint32_t)(*p);
+ bool carry = cs & uint32_t(0x80000000);
+ cs <<= 1;
+ if (carry) cs++;
+ }
+ return cs;
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/RecordHeader.h b/cpp/src/qpid/asyncStore/jrnl2/RecordHeader.h
new file mode 100644
index 0000000000..9a350b5b84
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/RecordHeader.h
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecordHeader.h
+ *
+ * List of journal record structs:
+ * struct DequeueHeader
+ * struct EnqueueHeader
+ * struct EventHeader
+ * struct FileHeader
+ * struct RecordHeader <-- This file
+ * struct RecordTail
+ * struct TransactionHeader
+ *
+ * Overview of journal record structs:
+ *
+ * <pre>
+ * +------------+ +--------------+
+ * | RecordTail | | RecordHeader |
+ * +------------+ | (abstract) |
+ * +--------------+
+ * ^
+ * |
+ * +----------------+---------+-------+-------------------+
+ * | | | |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * ^
+ * |
+ * +---------------+
+ * | EnqueueHeader |
+ * +---------------+
+ * </pre>
+ */
+
+#ifndef qpid_asyncStore_jrnl2_RecordHeader_h_
+#define qpid_asyncStore_jrnl2_RecordHeader_h_
+
+#include <stdint.h> // uint8_t, uint16_t, uint32_t, uint64_t
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for data common to the head of all journal files and records.
+ *
+ * This block of data includes identification for the file type, the encoding
+ * version, an endian indicator and a record ID.
+ *
+ * File layout in binary format (16 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x00 | m_magic | v | e | m_flags |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x08 | m_recordId |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ * <table>
+ * <tr>
+ * <td>v</td>
+ * <td>file version [ <code>_version</code> ] (If the format or encoding of
+ * this file changes, then this number should be incremented)</td>
+ * </tr>
+ * <tr>
+ * <td>e</td>
+ * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+ * little endian, <b>true</b> (0x01) for big endian</td>
+ * </tr>
+ * </table>
+ */
+class RecordHeader
+{
+public:
+ uint32_t m_magic; ///< File type identifier (magic number)
+ uint8_t m_version; ///< File encoding version
+ uint8_t m_bigEndianFlag; ///< Flag for determining endianness
+ uint16_t m_flags; ///< User and system flags
+ uint64_t m_recordId; ///< Record ID (rotating 64-bit counter)
+
+ /**
+ * \brief Mask for the record header flags field m_flags which is used to indicate that this record is part
+ * of a cycle opposite to the previous cycle.
+ *
+ * In order to identify which files and records were written last, the journal uses an overwrite indicator
+ * (OWI) flag in the headers of all records. This flag changes between \b true and \b false on consecutive runs
+ * through a circular buffer composed of several files. Looking for the point in the journal where the value of
+ * the OWI changes from that in the file header of the first journal file marks the last write point in the
+ * journal.
+ *
+ * This mask is used to isolate the OWI in the m_flags field of the record and file headers.
+ */
+ static const uint16_t HDR_OVERWRITE_INDICATOR_MASK = 0x1;
+
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ RecordHeader();
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ *
+ * \param magic Magic for this record
+ * \param version Version of this record
+ * \param recordId Record identifier for this record
+ * \param overwriteIndicator Overwrite indicator for this record
+ */
+ RecordHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const bool overwriteIndicator);
+
+ /**
+ * \brief Copy constructor.
+ */
+ RecordHeader(const RecordHeader& rh);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~RecordHeader();
+
+ /**
+ * \brief Convenience copy method.
+ */
+ void copy(const RecordHeader& rh);
+
+ /**
+ * \brief Resets all fields to default values (mostly 0).
+ */
+ virtual void reset();
+
+ /**
+ * \brief Return the value of the Overwrite Indicator for this record.
+ *
+ * \returns true if the Overwrite Indicator flag is set, false otherwise.
+ */
+ bool getOverwriteIndicator() const;
+
+ /**
+ * \brief Set the value of the Overwrite Indicator for this record
+ */
+ void setOverwriteIndicator(const bool owi);
+
+ /**
+ * \brief Return the header size of this record in bytes. Must be implemented by
+ * subclasses.
+ *
+ * \returns Size of record header in bytes.
+ */
+ static uint64_t getHeaderSize();
+
+ /**
+ * \brief Return the body (data) size of this record in bytes. Must be implemented
+ * by subclasses.
+ *
+ * \returns Size of record body in bytes.
+ */
+ virtual uint64_t getBodySize() const = 0;
+
+ /**
+ * \brief Return total size of this record in bytes, being the sum of the header,
+ * xid (if present), data (if present) and tail (if present). Must be implemented
+ * by subclasses.
+ *
+ * \returns The size of the entire record, including header, body (xid and data,
+ * if present) and record tail (if persent) in bytes.
+ */
+ virtual uint64_t getRecordSize() const = 0;
+
+ /// \todo TODO - Is this the right place for encode/decode fns?
+ ///**
+ // * \brief Encode (write) this record instance into the buffer pointed to by the buffer
+ // * pointer. Must be implemented by subclasses.
+ // */
+ //virtual uint64_t encode(char* bufferPtr,
+ // const uint64_t bufferSize,
+ // const uint64_t encodeOffset = 0) = 0;
+
+ /**
+ * \brief Return a uint32_t checksum for the header and body content of this record.
+ *
+ * \param initialValue The initial (or seed) value of the checksum.
+ *
+ * \returns Checksum for header and body of record. Tail (if any) is excluded.
+ */
+ uint32_t getCheckSum(uint32_t initialValue = 0) const;
+
+};
+
+#pragma pack()
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_RecordHeader_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/RecordIdCounter.h b/cpp/src/qpid/asyncStore/jrnl2/RecordIdCounter.h
new file mode 100644
index 0000000000..5fe2c01429
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/RecordIdCounter.h
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecordIdCounter.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_RecordCounter_h_
+#define qpid_asyncStore_jrnl2_RecordCounter_h_
+
+#include "AtomicCounter.h"
+
+#include <stdint.h> // uint64_t
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+typedef uint64_t recordId_t; ///< Integral type used to represent the Record Id (RID).
+typedef AtomicCounter<recordId_t> RecordIdCounter_t; ///< Counter to increment record ids
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_RecordCounter_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/RecordTail.cpp b/cpp/src/qpid/asyncStore/jrnl2/RecordTail.cpp
new file mode 100644
index 0000000000..fe33d47b13
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/RecordTail.cpp
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecordTail.cpp
+ */
+
+
+#include "RecordTail.h"
+
+#include "RecordHeader.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+RecordTail::RecordTail() :
+ m_xMagic(0xffffffff),
+ m_checkSum(0),
+ m_recordId(0)
+{}
+
+RecordTail::RecordTail(const uint32_t xMagic,
+ const uint32_t checkSum,
+ const uint64_t recordId) :
+ m_xMagic(xMagic),
+ m_checkSum(checkSum),
+ m_recordId(recordId)
+{}
+
+RecordTail::RecordTail(const RecordHeader& rh) :
+ m_xMagic(~rh.m_magic),
+ m_checkSum(rh.getCheckSum()),
+ m_recordId(rh.m_recordId)
+{}
+
+RecordTail::RecordTail(const RecordTail& rt) :
+ m_xMagic(rt.m_xMagic),
+ m_checkSum(rt.m_checkSum),
+ m_recordId(rt.m_recordId)
+{}
+
+void
+RecordTail::copy(const RecordTail& rt)
+{
+ m_xMagic = rt.m_xMagic;
+ m_checkSum = rt.m_checkSum;
+ m_recordId = rt.m_recordId;
+}
+
+void
+RecordTail::reset()
+{
+ m_xMagic = 0xffffffff;
+ m_checkSum = 0;
+ m_recordId = 0;
+}
+
+//static
+uint64_t
+RecordTail::getSize()
+{
+ return sizeof(RecordTail);
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/RecordTail.h b/cpp/src/qpid/asyncStore/jrnl2/RecordTail.h
new file mode 100644
index 0000000000..a5b8afc576
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/RecordTail.h
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecordTail.h
+ *
+ * List of journal record structs:
+ * struct DequeueHeader
+ * struct EnqueueHeader
+ * struct EventHeader
+ * struct FileHeader
+ * struct RecordHeader
+ * struct RecordTail <-- This file
+ * struct TransactionHeader
+ *
+ * Overview of journal record structs:
+ *
+ * <pre>
+ * +------------+ +--------------+
+ * | RecordTail | | RecordHeader |
+ * +------------+ | (abstract) |
+ * +--------------+
+ * ^
+ * |
+ * +----------------+---------+-------+-------------------+
+ * | | | |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * ^
+ * |
+ * +---------------+
+ * | EnqueueHeader |
+ * +---------------+
+ * </pre>
+ */
+
+#ifndef qpid_asyncStore_jrnl2_RecordTail_h_
+#define qpid_asyncStore_jrnl2_RecordTail_h_
+
+#include <stdint.h> // uint32_t, uint64_t
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+class RecordHeader;
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for the tail of all records which contain data and which must follow
+ * the data portion of that record.
+ *
+ * The magic number used here is the binary inverse (1's complement) of the magic
+ * used in the record header; this minimizes possible confusion with other headers
+ * that may be present during recovery. The tail is used with all records that have
+ * either XIDs or data - i.e. any size-variable content. Currently the only records
+ * that do NOT use the tail are non-transactional dequeues and filler records.
+ *
+ * Record layout in binary format (16 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x00 | m_xMagic | m_checkSum |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x08 | m_recordId |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ */
+class RecordTail
+{
+public:
+ uint32_t m_xMagic; ///< Binary inverse (1's complement) of hdr magic number
+ uint32_t m_checkSum; ///< Checksum for header and body of record
+ uint64_t m_recordId; ///< Record identifier matching that of the header for this record
+
+ /**
+ * \brief Default constructor, which sets most values to 0.
+ */
+ RecordTail();
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ *
+ * \param xMagic The inverse of the record header magic (ie ~rh._magic).
+ * \param checkSum The checksum for this record header and body.
+ * \param recordId The record identifier matching the record header.
+ */
+ RecordTail(const uint32_t xMagic,
+ const uint32_t checkSum,
+ const uint64_t recordId);
+
+ /**
+ * \brief Convenience constructor which initializes values during construction from existing RecordHeader
+ * instance.
+ *
+ * \param rh Header instance for which the RecordTail is to be created.
+ */
+ RecordTail(const RecordHeader& rh);
+
+ /**
+ * \brief Copy constructor.
+ *
+ * \param rt Instance to be copied.
+ */
+ RecordTail(const RecordTail& rt);
+
+ /**
+ * \brief Convenience copy method.
+ *
+ * \param rt Instance to be copied.
+ */
+ void copy(const RecordTail& rt);
+
+ /**
+ * \brief Resets all fields to default values (mostly 0).
+ */
+ void reset();
+
+ /**
+ * \brief Returns the size of the header in bytes.
+ */
+ static uint64_t getSize();
+
+};
+
+#pragma pack()
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_RecordTail_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/ScopedLock.cpp b/cpp/src/qpid/asyncStore/jrnl2/ScopedLock.cpp
new file mode 100644
index 0000000000..4acba534ef
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/ScopedLock.cpp
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ScopedLock.cpp
+ */
+
+#include "ScopedLock.h"
+
+#include "JournalError.h"
+
+#include <cerrno> // EBUSY
+#include <cstring> // std::strerror
+#include <sstream> // std::ostringstream
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+// --- ScopedMutex ---
+
+ScopedMutex::ScopedMutex()
+{
+ PTHREAD_CHK(::pthread_mutex_init(&m_mutex, 0), "::pthread_mutex_init", "ScopedMutex", "ScopedMutex");
+}
+
+ScopedMutex::~ScopedMutex()
+{
+ PTHREAD_CHK(::pthread_mutex_destroy(&m_mutex), "::pthread_mutex_destroy", "ScopedMutex", "~ScopedMutex");
+}
+
+::pthread_mutex_t*
+ScopedMutex::get() const
+{
+ return &m_mutex;
+}
+
+
+// --- ScopedMutexContainer ---
+
+ScopedMutexContainer::ScopedMutexContainer(const ScopedMutex& sm) :
+ m_scopedMutexRef(sm)
+{}
+
+::pthread_mutex_t* ScopedMutexContainer::get() const
+{
+ return m_scopedMutexRef.get();
+}
+
+
+// --- ScopedLock ---
+
+ScopedLock::ScopedLock(const ScopedMutex& sm) :
+ ScopedMutexContainer(sm)
+{
+ PTHREAD_CHK(::pthread_mutex_lock(m_scopedMutexRef.get()), "::pthread_mutex_lock", "ScopedLock", "ScopedLock");
+}
+
+ScopedLock::~ScopedLock()
+{
+ PTHREAD_CHK(::pthread_mutex_unlock(m_scopedMutexRef.get()), "::pthread_mutex_unlock", "ScopedLock", "~ScopedLock");
+}
+
+
+// --- ScopedTryLock ---
+
+ScopedTryLock::ScopedTryLock(const ScopedMutex& sm) :
+ ScopedMutexContainer(sm),
+ m_lockedFlag(false)
+{
+ int ret = ::pthread_mutex_trylock(m_scopedMutexRef.get());
+ m_lockedFlag = (ret == 0); // check if lock obtained
+ if (!m_lockedFlag && ret != EBUSY) {
+ PTHREAD_CHK(ret, "::pthread_mutex_trylock", "ScopedTryLock", "ScopedTryLock");
+ }
+}
+
+ScopedTryLock::~ScopedTryLock()
+{
+ if (m_lockedFlag)
+ PTHREAD_CHK(::pthread_mutex_unlock(m_scopedMutexRef.get()), "::pthread_mutex_unlock", "ScopedTryLock",
+ "~ScopedTryLock");
+}
+
+bool
+ScopedTryLock::isLocked() const
+{
+ return m_lockedFlag;
+}
+
+
+// --- ScopedConditionVariable ---
+/*
+
+ScopedConditionVariable::ScopedConditionVariable()
+{
+ PTHREAD_CHK(::pthread_cond_init(&m_cv, 0), "pthread_cond_init", "ScopedConditionVariable",
+ "ScopedConditionVariable");
+}
+
+ScopedConditionVariable::~ScopedConditionVariable()
+{
+ PTHREAD_CHK(::pthread_cond_destroy(&m_cv), "pthread_cond_destroy", "ScopedConditionVariable",
+ "~ScopedConditionVariable");
+}
+
+void
+ScopedConditionVariable::wait(ScopedLock& sl)
+{
+ PTHREAD_CHK(::pthread_cond_wait(&m_cv, sl.get()), "pthread_cond_wait", "ScopedConditionVariable", "wait");
+}
+
+void
+ScopedConditionVariable::notify_one()
+{
+ PTHREAD_CHK(::pthread_cond_signal(&m_cv), "pthread_cond_signal", "ScopedConditionVariable", "notify_one");
+}
+
+void
+ScopedConditionVariable::notify_all()
+{
+ PTHREAD_CHK(::pthread_cond_broadcast(&m_cv), "pthread_cond_broadcast", "ScopedConditionVariable",
+ "notify_all");
+}
+*/
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/ScopedLock.h b/cpp/src/qpid/asyncStore/jrnl2/ScopedLock.h
new file mode 100644
index 0000000000..a420850c0b
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/ScopedLock.h
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ScopedLock.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_ScopedLock_h_
+#define qpid_asyncStore_jrnl2_ScopedLock_h_
+
+#include <pthread.h>
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+class ScopedMutex
+{
+public:
+ ScopedMutex();
+ virtual ~ScopedMutex();
+ ::pthread_mutex_t* get() const;
+protected:
+ mutable ::pthread_mutex_t m_mutex;
+};
+
+
+
+class ScopedMutexContainer
+{
+public:
+ ScopedMutexContainer(const ScopedMutex& sm);
+ ::pthread_mutex_t* get() const;
+protected:
+ const ScopedMutex& m_scopedMutexRef; ///< Reference to ScopedMutex instance being locked.
+};
+
+
+
+class ScopedLock : public ScopedMutexContainer
+{
+public:
+ ScopedLock(const ScopedMutex& sm);
+ virtual ~ScopedLock();
+};
+
+
+
+class ScopedTryLock : public ScopedMutexContainer
+{
+public:
+ ScopedTryLock(const ScopedMutex& sm);
+ virtual ~ScopedTryLock();
+ bool isLocked() const;
+protected:
+ bool m_lockedFlag;
+};
+
+
+
+/*
+class ScopedConditionVariable
+{
+public:
+ ScopedConditionVariable();
+ ~ScopedConditionVariable();
+ void wait(ScopedLock& sl);
+ void notify_one();
+ void notify_all();
+protected:
+ mutable ::pthread_cond_t m_cv;
+};
+*/
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_ScopedLock_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/State.h b/cpp/src/qpid/asyncStore/jrnl2/State.h
new file mode 100644
index 0000000000..3a4354760a
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/State.h
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file State.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_State_h_
+#define qpid_asyncStore_jrnl2_State_h_
+
+#include "Streamable.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+/**
+ * \brief Utility class to manage a simple state machine defined by an enumeration \a E.
+ *
+ * It is necessary that enumeration E has its default / initial value coded as value 0. This enumeration would
+ * be typically coded as follows:
+ * \code
+ * typedef enum {STATE1=0, STATE2, STATE3} myStates_t;
+ * class myStateMachine : State<myStates_t> {
+ * protected:
+ * void set(myStates_t s) { ... } // Implement state machine logic and transition checking here
+ * public:
+ * // State transition verbs
+ * void action1() { set(STATE2); }
+ * void action2() { set(STATE3); }
+ * };
+ * \endcode
+ *
+ * For child classes, the state machine transition logic is implemented in set(const E), and the state change
+ * verbs are implemented by calling set() with values from E. A call to reset() will return the state to the
+ * state numerically equal to 0, irrespective of the current state or state transition logic.
+ *
+ * Function getAsStr() returns the current state as a string, and is usually implemented by calling a local static
+ * function which translates the values of E into c-string literals.
+ */
+template <class E> class State : public Streamable {
+public:
+ /**
+ * \brief Default constructor, which sets the internal state m_state to the state in E which is numerically
+ * equal to 0.
+ */
+ State<E>() :
+ Streamable(),
+ m_state(E(0))
+ {}
+
+ /**
+ * \brief Constructor which sets the internal state m_state to the value \a s.
+ */
+ State<E>(const E s) :
+ Streamable(),
+ m_state(s)
+ {}
+
+ /**
+ * \brief Copy constructor, which sets the internal state m_state to the same value as that of the copied
+ * instance \a s.
+ */
+ State<E>(const State<E>& s) :
+ Streamable(),
+ m_state(s.m_state)
+ {}
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~State()
+ {}
+
+ /**
+ * \brief Get the internal state m_state as an enumeration of type E.
+ *
+ * \returns The internal state m_state.
+ */
+ virtual E get() const
+ {
+ return m_state;
+ }
+
+ /**
+ * \brief Get the internal state m_state as a c-string.
+ *
+ * \returns Internal state m_state as a c-string.
+ */
+ virtual const char* getAsStr() const = 0;
+
+ /**
+ * \brief Reset the internal state m_state to the value of \a s or, if not supplied, to the state in E which is
+ * numerically equal to 0.
+ *
+ * \param s The state to which this instance should be reset. The default value is the member of E with a
+ * numerical value of 0.
+ *
+ * \note This call ignores the state transition logic in set() and forces the state to 0. This is not intended
+ * as a normal part of the state machine operation, but rather for reusing a state machine instance or in
+ * conditions where you need to force a change over the state machine logic. For cases where the normal state
+ * machine logic returns the state to its initial point, a verb should be created which uses set() and is
+ * subjected to the transition logic and checking of set().
+ */
+ virtual void reset(const E s = E(0))
+ {
+ m_state = s;
+ }
+
+ /**
+ * \brief Stream the string representation of the internal state m_state.
+ *
+ * \param os Stream to which the string representation of the internal state m_state will be sent.
+ */
+ virtual void toStream(std::ostream& os = std::cout) const
+ {
+ os << getAsStr();
+ }
+
+protected:
+ E m_state; ///< Local state of this state machine instance.
+
+ /**
+ * \brief Set (or change) the value of m_state. This function must implement the state machine checks for
+ * legal state change transitions, and throw an exception if an illegal state transition is requested.
+ *
+ * \param s State to which this machine should be changed.
+ */
+ virtual void set(const E s) = 0;
+
+ /**
+ * \brief Compare the state \a s with the current state.
+ *
+ * \returns \b true if \a s is the same as internal state m_state, \b false otherwise.
+ */
+ virtual bool is(const E s) const
+ {
+ return m_state == s;
+ }
+
+};
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_State_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/Streamable.cpp b/cpp/src/qpid/asyncStore/jrnl2/Streamable.cpp
new file mode 100644
index 0000000000..0ee86223b1
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/Streamable.cpp
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Streamable.cpp
+ */
+
+#include "Streamable.h"
+
+#include <sstream>
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+Streamable::~Streamable()
+{}
+
+std::string
+Streamable::toString() const
+{
+ std::ostringstream oss;
+ toStream(oss);
+ return oss.str();
+}
+
+std::ostream&
+operator<<(std::ostream& os, const Streamable& s)
+{
+ s.toStream(os);
+ return os;
+}
+
+std::ostream&
+operator<<(std::ostream& os, const Streamable* sPtr)
+{
+ sPtr->toStream(os);
+ return os;
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/Streamable.h b/cpp/src/qpid/asyncStore/jrnl2/Streamable.h
new file mode 100644
index 0000000000..45a15efda5
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/Streamable.h
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Streamable.h
+ */
+
+#ifndef qpid_asyncStore_jrnl2_Streamable_h_
+#define qpid_asyncStore_jrnl2_Streamable_h_
+
+#include <iostream>
+#include <string>
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+/**
+ * \brief Abstract class which provides the mechanisms to stream
+ *
+ * An abstract class which provides stream functions. The toStream() function must be implemented by subclasses,
+ * and is used by the remaining functions. For convenience, toString() returns a std::string object.
+ */
+class Streamable
+{
+public:
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~Streamable();
+
+ /**
+ * \brief Stream some representation of the object to an output stream.
+ *
+ * \param os Output stream to which the class data is to be streamed.
+ */
+ virtual void toStream(std::ostream& os = std::cout) const = 0;
+
+ /**
+ * \brief Creates a string representation of the test parameters.
+ *
+ * Convenience feature which creates and returns a std::string object containing the content of toStream().
+ *
+ * \returns Content of toStream()
+ */
+ std::string toString() const;
+
+ /**
+ * \brief Stream the object to an output stream
+ *
+ * \param os Stream to which output is to be sent.
+ * \param s Object instance to be streamed.
+ * \returns Original stream passed into parameter \a os.
+ */
+ friend std::ostream& operator<<(std::ostream& os, const Streamable& s);
+
+ /**
+ * \brief Stream the object to an output stream through an object pointer
+ *
+ * \param os Stream to which output is to be sent.
+ * \param sPtr Pointer to object instance to be streamed.
+ * \returns Original stream passed into parameter \a os.
+ */
+ friend std::ostream& operator<<(std::ostream& os, const Streamable* sPtr);
+
+};
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_Streamable_h_
diff --git a/cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.cpp b/cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.cpp
new file mode 100644
index 0000000000..9de9f2656c
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.cpp
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TransactionHeader.cpp
+ */
+
+#include "RecordTail.h"
+#include "TransactionHeader.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+TransactionHeader::TransactionHeader() :
+ RecordHeader(),
+ m_xidSize(0)
+{}
+
+TransactionHeader::TransactionHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const uint64_t xidSize,
+ const bool overwriteIndicator) :
+ RecordHeader(magic, version, recordId, overwriteIndicator),
+ m_xidSize(xidSize)
+{}
+
+TransactionHeader::TransactionHeader(const TransactionHeader& th) :
+ RecordHeader(th),
+ m_xidSize(th.m_xidSize)
+{}
+
+TransactionHeader::~TransactionHeader()
+{}
+
+void
+TransactionHeader::copy(const TransactionHeader& th)
+{
+ RecordHeader::copy(th);
+ m_xidSize = th.m_xidSize;
+}
+
+void
+TransactionHeader::reset()
+{
+ RecordHeader::reset();
+ m_xidSize = 0;
+}
+
+//static
+uint64_t
+TransactionHeader::getHeaderSize()
+{
+ return sizeof(TransactionHeader);
+}
+
+uint64_t
+TransactionHeader::getBodySize() const
+{
+ return m_xidSize;
+}
+
+uint64_t
+TransactionHeader::getRecordSize() const
+{
+ // By definition, TransactionRecords must always have an xid, hence a record
+ // tail as well. No check on body size required in this case.
+ return getHeaderSize() + getBodySize() + RecordTail::getSize();
+}
+
+}}} // namespace qpid::asyncStore::jrnl2
diff --git a/cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.h b/cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.h
new file mode 100644
index 0000000000..9bc0ee9e9b
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.h
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TransactionHeader.h
+ *
+ * List of journal record structs:
+ * struct DequeueHeader
+ * struct EnqueueHeader
+ * struct EventHeader
+ * struct FileHeader
+ * struct RecordHeader
+ * struct RecordTail
+ * struct TransactionHeader <-- This file
+ *
+ * Overview of journal record structs:
+ *
+ * <pre>
+ * +------------+ +--------------+
+ * | RecordTail | | RecordHeader |
+ * +------------+ | (abstract) |
+ * +--------------+
+ * ^
+ * |
+ * +----------------+---------+-------+-------------------+
+ * | | | |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * ^
+ * |
+ * +---------------+
+ * | EnqueueHeader |
+ * +---------------+
+ * </pre>
+ */
+
+#ifndef qpid_asyncStore_jrnl2_TransactionHeader_h_
+#define qpid_asyncStore_jrnl2_TransactionHeader_h_
+
+#include "RecordHeader.h"
+
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for transaction commit and abort records.
+ *
+ * Struct for DTX commit and abort records. Only the magic distinguishes between them. Since
+ * this record must be used in the context of a valid XID, the xidsize field must not be zero.
+ * Immediately following this record is the XID itself which is xidsize bytes long, followed by
+ * a rec_tail.
+ *
+ * Note that this record had its own rid distinct from the rids of the record(s) making up the
+ * transaction it is committing or aborting.
+ *
+ * Record layout in binary format (24 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x00 | _magic | v | e | _flags | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader
+ * 0x08 | _recordId | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x10 | _xidSize |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ * <table>
+ * <tr>
+ * <td>v</td>
+ * <td>file version [ <code>_version</code> ] (If the format or encoding of
+ * this file changes, then this number should be incremented)</td>
+ * </tr>
+ * <tr>
+ * <td>e</td>
+ * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+ * little endian, <b>true</b> (0x01) for big endian</td>
+ * </tr>
+ * </table>
+ */
+class TransactionHeader : public RecordHeader
+{
+public:
+ uint64_t m_xidSize; ///< XID size
+
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ TransactionHeader();
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ *
+ * \param magic The magic for this record
+ * \param version Version of this record
+ * \param recordId Record identifier for this record
+ * \param xidSize Size of the transaction (or distributed transaction) ID for this record
+ * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this
+ * record
+ */
+ TransactionHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const uint64_t xidSize,
+ const bool overwriteIndicator);
+
+ /**
+ * \brief Copy constructor
+ *
+ * \param th Instance to be copied
+ */
+ TransactionHeader(const TransactionHeader& th);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~TransactionHeader();
+
+ /**
+ * \brief Convenience copy method.
+ */
+ inline void copy(const TransactionHeader& th);
+
+ /**
+ * \brief Reset this record to default values (mostly 0)
+ */
+ inline void reset();
+
+ /**
+ * \brief Return the header size of this record in bytes.
+ *
+ * \returns Size of record header in bytes.
+ */
+ static uint64_t getHeaderSize();
+
+ /**
+ * \brief Return the body (data) size of this record in bytes.
+ *
+ * \returns Size of record body in bytes.
+ */
+ uint64_t getBodySize() const;
+
+ /**
+ * \brief Return total size of this record in bytes, being in the case of the dequeue record the size of the
+ * header, the size of the body (xid only) and the size of the tail.
+ */
+ inline uint64_t getRecordSize() const;
+
+};
+
+#pragma pack()
+
+}}} // namespace qpid::asyncStore::jrnl2
+
+#endif // qpid_asyncStore_jrnl2_TransactionHeader_h_