diff options
Diffstat (limited to 'cpp/src/qpid/asyncStore')
64 files changed, 7560 insertions, 0 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp new file mode 100644 index 0000000000..87d3713b96 --- /dev/null +++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncOperation.cpp + */ + +#include "AsyncOperation.h" + +#include "qpid/Exception.h" +#include "qpid/broker/BrokerContext.h" // for delete in d'tor + +#include <sstream> + +namespace qpid { +namespace asyncStore { + +AsyncOperation::AsyncOperation() : + m_op(NONE), + m_targetHandle(), + m_dataSrc(0), + m_txnHandle(0), + m_resCb(0), + m_brokerCtxt(0) +{} + +AsyncOperation::AsyncOperation(const opCode op, + const qpid::broker::IdHandle* th, + const qpid::broker::ResultCallback resCb, + qpid::broker::BrokerContext* brokerCtxt) : + m_op(op), + m_targetHandle(th), + m_dataSrc(0), + m_txnHandle(0), + m_resCb(resCb), + m_brokerCtxt(brokerCtxt) +{} + +AsyncOperation::AsyncOperation(const opCode op, + const qpid::broker::IdHandle* th, + const qpid::broker::DataSource* dataSrc, + const qpid::broker::ResultCallback resCb, + qpid::broker::BrokerContext* brokerCtxt) : + m_op(op), + m_targetHandle(th), + m_dataSrc(dataSrc), + m_txnHandle(0), + m_resCb(resCb), + m_brokerCtxt(brokerCtxt) +{} + +AsyncOperation::AsyncOperation(const opCode op, + const qpid::broker::IdHandle* th, + const qpid::broker::TxnHandle* txnHandle, + const qpid::broker::ResultCallback resCb, + qpid::broker::BrokerContext* brokerCtxt) : + m_op(op), + m_targetHandle(th), + m_dataSrc(0), + m_txnHandle(txnHandle), + m_resCb(resCb), + m_brokerCtxt(brokerCtxt) +{} + +AsyncOperation::AsyncOperation(const opCode op, + const qpid::broker::IdHandle* th, + const qpid::broker::DataSource* dataSrc, + const qpid::broker::TxnHandle* txnHandle, + const qpid::broker::ResultCallback resCb, + qpid::broker::BrokerContext* brokerCtxt) : + m_op(op), + m_targetHandle(th), + m_dataSrc(dataSrc), + m_txnHandle(txnHandle), + m_resCb(resCb), + m_brokerCtxt(brokerCtxt) +{} + +AsyncOperation::~AsyncOperation() +{} + +const char* +AsyncOperation::getOpStr() const +{ + return getOpStr(m_op); +} + +//static +const char* +AsyncOperation::getOpStr(const opCode op) +{ + switch (op) { + case NONE: return "<none>"; + case TXN_PREPARE: return "TXN_PREPARE"; + case TXN_COMMIT: return "TXN_COMMIT"; + case TXN_ABORT: return "TXN_ABORT"; + case CONFIG_CREATE: return "CONFIG_CREATE"; + case CONFIG_DESTROY: return "CONFIG_DESTROY"; + case QUEUE_CREATE: return "QUEUE_CREATE"; + case QUEUE_FLUSH: return "QUEUE_FLUSH"; + case QUEUE_DESTROY: return "QUEUE_DESTROY"; + case EVENT_CREATE: return "EVENT_CREATE"; + case EVENT_DESTROY: return "EVENT_DESTROY"; + case MSG_ENQUEUE: return "MSG_ENQUEUE"; + case MSG_DEQUEUE: return "MSG_DEQUEUE"; + } + std::ostringstream oss; + oss << "AsyncStore: AsyncOperation::getOpStr(): Unknown op-code \"" << op << "\""; + throw qpid::Exception(oss.str()); +} + +}} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h new file mode 100644 index 0000000000..756b393b90 --- /dev/null +++ b/cpp/src/qpid/asyncStore/AsyncOperation.h @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncOperation.h + */ + +#ifndef qpid_asyncStore_AsyncOperation_h_ +#define qpid_asyncStore_AsyncOperation_h_ + +#include "qpid/broker/AsyncStore.h" +#include "qpid/broker/IdHandle.h" + +namespace qpid { +namespace asyncStore { + +class AsyncOperation { +public: + typedef enum {NONE=0, + TXN_PREPARE, + TXN_COMMIT, + TXN_ABORT, + CONFIG_CREATE, + CONFIG_DESTROY, + QUEUE_CREATE, + QUEUE_FLUSH, + QUEUE_DESTROY, + EVENT_CREATE, + EVENT_DESTROY, + MSG_ENQUEUE, + MSG_DEQUEUE + } opCode; + + AsyncOperation(); + AsyncOperation(const opCode op, + const qpid::broker::IdHandle* th, + const qpid::broker::ResultCallback resCb, + qpid::broker::BrokerContext* brokerCtxt); + AsyncOperation(const opCode op, + const qpid::broker::IdHandle* th, + const qpid::broker::DataSource* dataSrc, + const qpid::broker::ResultCallback resCb, + qpid::broker::BrokerContext* brokerCtxt); + AsyncOperation(const opCode op, + const qpid::broker::IdHandle* th, + const qpid::broker::TxnHandle* txnHandle, + const qpid::broker::ResultCallback resCb, + qpid::broker::BrokerContext* brokerCtxt); + AsyncOperation(const opCode op, + const qpid::broker::IdHandle* th, + const qpid::broker::DataSource* dataSrc, + const qpid::broker::TxnHandle* txnHandle, + const qpid::broker::ResultCallback resCb, + qpid::broker::BrokerContext* brokerCtxt); + virtual ~AsyncOperation(); + const char* getOpStr() const; + static const char* getOpStr(const opCode op); + + opCode m_op; + const qpid::broker::IdHandle* m_targetHandle; + const qpid::broker::DataSource* m_dataSrc; + const qpid::broker::TxnHandle* m_txnHandle; + const qpid::broker::ResultCallback m_resCb; + qpid::broker::BrokerContext* m_brokerCtxt; +}; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_AsyncOperation_h_ diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp new file mode 100644 index 0000000000..3bdb2c3b98 --- /dev/null +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncStoreImpl.cpp + */ + +#include "AsyncStoreImpl.h" + +#include "AsyncOperation.h" + +#include "qpid/broker/ConfigHandle.h" +#include "qpid/broker/EnqueueHandle.h" +#include "qpid/broker/EventHandle.h" +#include "qpid/broker/MessageHandle.h" +#include "qpid/broker/QueueHandle.h" +#include "qpid/broker/TxnHandle.h" + +#include <boost/intrusive_ptr.hpp> + +namespace qpid { +namespace asyncStore { + +AsyncStoreImpl::AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, + const AsyncStoreOptions& opts) : + m_poller(poller), + m_opts(opts), + m_runState(), + m_operations(m_poller) +{} + +AsyncStoreImpl::~AsyncStoreImpl() +{} + +void +AsyncStoreImpl::initialize() +{} + +uint64_t +AsyncStoreImpl::getNextRid() +{ + return m_ridCntr.next(); +} + +void +AsyncStoreImpl::initManagement(qpid::broker::Broker* /*broker*/) +{} + +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle(const std::string& xid) +{ + return qpid::broker::TxnHandle(new TxnHandleImpl(xid)); +} + +qpid::broker::ConfigHandle +AsyncStoreImpl::createConfigHandle() +{ + return qpid::broker::ConfigHandle(new ConfigHandleImpl()); +} + +qpid::broker::QueueHandle +AsyncStoreImpl::createQueueHandle(const std::string& name, + const qpid::types::Variant::Map& opts) +{ + return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts)); +} + +qpid::broker::EventHandle +AsyncStoreImpl::createEventHandle(qpid::broker::QueueHandle& queueHandle, + const std::string& key) +{ + return qpid::broker::EventHandle(new EventHandleImpl(queueHandle, key)); +} + +qpid::broker::MessageHandle +AsyncStoreImpl::createMessageHandle(const qpid::broker::DataSource* dataSrc) + +{ + return qpid::broker::MessageHandle(new MessageHandleImpl(dataSrc)); +} + +qpid::broker::EnqueueHandle +AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, + qpid::broker::QueueHandle& queueHandle) +{ + return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle)); +} + +void +AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_PREPARE, + dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_COMMIT, + dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_ABORT, + dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, + const qpid::broker::DataSource* dataSrc, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::CONFIG_CREATE, + dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), + dataSrc, + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::CONFIG_DESTROY, + dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, + const qpid::broker::DataSource* dataSrc, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_CREATE, + dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), + dataSrc, + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_DESTROY, + dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_FLUSH, + dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, + const qpid::broker::DataSource* dataSrc, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_CREATE, + dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + dataSrc, + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, + const qpid::broker::DataSource* dataSrc, + qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_CREATE, + dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + dataSrc, + &txnHandle, + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY, + dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, + qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY, + dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + &txnHandle, + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE, + dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE, + dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + &txnHandle, + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE, + dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt) +{ + AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE, + dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + &txnHandle, + resultCb, + brokerCtxt); + m_operations.submit(op); +} + +int +AsyncStoreImpl::loadContent(qpid::broker::MessageHandle& /*msgHandle*/, + qpid::broker::QueueHandle& /*queueHandle*/, + char* /*data*/, + uint64_t /*offset*/, + const uint64_t /*length*/) +{ + return 0; +} + +}} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h new file mode 100644 index 0000000000..c33e9030fe --- /dev/null +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncStoreImpl.h + */ + +#ifndef qpid_asyncStore_AsyncStoreImpl_h_ +#define qpid_asyncStore_AsyncStoreImpl_h_ + +#include "AsyncStoreOptions.h" +#include "RunState.h" +#include "OperationQueue.h" + +#include "qpid/asyncStore/jrnl2/RecordIdCounter.h" +#include "qpid/broker/AsyncStore.h" +#include "qpid/sys/Poller.h" + +namespace qpid { + +namespace broker { +class Broker; +} // namespace qpid::broker + +namespace asyncStore { + +class AsyncStoreImpl: public qpid::broker::AsyncStore { +public: + AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, + const AsyncStoreOptions& opts); + virtual ~AsyncStoreImpl(); + void initialize(); + uint64_t getNextRid(); + + // Management + + void initManagement(qpid::broker::Broker* broker); + + // AsyncStore interface + + qpid::broker::TxnHandle createTxnHandle(const std::string& xid=std::string()); + qpid::broker::ConfigHandle createConfigHandle(); + qpid::broker::QueueHandle createQueueHandle(const std::string& name, + const qpid::types::Variant::Map& opts); + qpid::broker::EventHandle createEventHandle(qpid::broker::QueueHandle& queueHandle, + const std::string& key=std::string()); + qpid::broker::MessageHandle createMessageHandle(const qpid::broker::DataSource* dataSrc); + qpid::broker::EnqueueHandle createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, + qpid::broker::QueueHandle& queueHandle); + + void submitPrepare(qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + void submitCommit(qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + void submitAbort(qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + + void submitCreate(qpid::broker::ConfigHandle& cfgHandle, + const qpid::broker::DataSource* dataSrc, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + void submitDestroy(qpid::broker::ConfigHandle& cfgHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + + void submitCreate(qpid::broker::QueueHandle& queueHandle, + const qpid::broker::DataSource* dataSrc, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + void submitDestroy(qpid::broker::QueueHandle& queueHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + void submitFlush(qpid::broker::QueueHandle& queueHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + + void submitCreate(qpid::broker::EventHandle& eventHandle, + const qpid::broker::DataSource* dataSrc, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + void submitCreate(qpid::broker::EventHandle& eventHandle, + const qpid::broker::DataSource* dataSrc, + qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + void submitDestroy(qpid::broker::EventHandle& eventHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + void submitDestroy(qpid::broker::EventHandle& eventHandle, + qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + + void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + void submitDequeue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + void submitDequeue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerContext* brokerCtxt); + + // Legacy - Restore FTD message, is NOT async! + virtual int loadContent(qpid::broker::MessageHandle& msgHandle, + qpid::broker::QueueHandle& queueHandle, + char* data, + uint64_t offset, + const uint64_t length); + +protected: + boost::shared_ptr<qpid::sys::Poller> m_poller; + AsyncStoreOptions m_opts; + RunState m_runState; + OperationQueue m_operations; + qpid::asyncStore::jrnl2::RecordIdCounter_t m_ridCntr; +}; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_AsyncStoreImpl_h_ diff --git a/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp b/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp new file mode 100644 index 0000000000..939d65ddf9 --- /dev/null +++ b/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncStoreOptions.cpp + */ + +#include "AsyncStoreOptions.h" + +namespace qpid { +namespace asyncStore { + +AsyncStoreOptions::AsyncStoreOptions(const std::string& name): + qpid::Options(name), + m_storeDir(getDefaultStoreDir()) +{ + addOptions() + ("store-dir", qpid::optValue(m_storeDir, "DIR"), + "Store directory location for persistence (instead of using --data-dir value). " + "Required if --no-data-dir is also used.") + ; +} + +AsyncStoreOptions::AsyncStoreOptions(const std::string& storeDir, + const std::string& name) : + qpid::Options(name), + m_storeDir(storeDir) +{} + +AsyncStoreOptions::~AsyncStoreOptions() +{} + +void +AsyncStoreOptions::printVals(std::ostream& os) const +{ + os << "ASYNC STORE OPTIONS:" << std::endl; + os << " Store directory location for persistence [store-dir]: \"" << m_storeDir << "\"" << std::endl; +} + +void +AsyncStoreOptions::validate() +{} + +//static +std::string& AsyncStoreOptions::getDefaultStoreDir() +{ + static std::string s_defaultStoreDir = "/tmp"; + return s_defaultStoreDir; +} + +}} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/AsyncStoreOptions.h b/cpp/src/qpid/asyncStore/AsyncStoreOptions.h new file mode 100644 index 0000000000..bc1b6a2f9f --- /dev/null +++ b/cpp/src/qpid/asyncStore/AsyncStoreOptions.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncStoreOptions.h + */ + +#ifndef qpid_asyncStore_AsyncStoreOptions_h_ +#define qpid_asyncStore_AsyncStoreOptions_h_ + +#include "qpid/asyncStore/jrnl2/Streamable.h" + +#include "qpid/Options.h" + +#include <string> + +namespace qpid { +namespace broker { +class Options; +} +namespace asyncStore { + +class AsyncStoreOptions : public qpid::Options +{ +public: + AsyncStoreOptions(const std::string& name="Async Store Options"); + AsyncStoreOptions(const std::string& storeDir, + const std::string& name="Async Store Options"); + virtual ~AsyncStoreOptions(); + void printVals(std::ostream& os) const; + void validate(); + + std::string m_storeDir; + +protected: + // Static initialization race condition avoidance with static instance of Plugin class (using construct-on-first-use idiom). + static std::string& getDefaultStoreDir(); +}; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_AsyncStoreOptions_h_ diff --git a/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp b/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp new file mode 100644 index 0000000000..64e2e848fa --- /dev/null +++ b/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ConfigHandleImpl.cpp + */ + +#include "ConfigHandleImpl.h" + +#include "qpid/messaging/PrivateImplRef.h" + +namespace qpid { +namespace asyncStore { + +ConfigHandleImpl::ConfigHandleImpl() +{} + +ConfigHandleImpl::~ConfigHandleImpl() +{} + +}} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/ConfigHandleImpl.h b/cpp/src/qpid/asyncStore/ConfigHandleImpl.h new file mode 100644 index 0000000000..17069ec21c --- /dev/null +++ b/cpp/src/qpid/asyncStore/ConfigHandleImpl.h @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ConfigHandleImpl.h + */ + +#ifndef qpid_asyncStore_ConfigHandleImpl_h_ +#define qpid_asyncStore_ConfigHandleImpl_h_ + +#include "qpid/RefCounted.h" + +namespace qpid { +namespace asyncStore { + +class ConfigHandleImpl : public virtual qpid::RefCounted +{ +public: + ConfigHandleImpl(); + virtual ~ConfigHandleImpl(); +}; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_ConfigHandleImpl_h_ diff --git a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp new file mode 100644 index 0000000000..0e291def78 --- /dev/null +++ b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file EnqueueHandleImpl.cpp + */ + +#include "EnqueueHandleImpl.h" + +#include "qpid/messaging/PrivateImplRef.h" + +namespace qpid { +namespace asyncStore { + +EnqueueHandleImpl::EnqueueHandleImpl(qpid::broker::MessageHandle& msgHandle, + qpid::broker::QueueHandle& queueHandle) : + m_msgHandle(msgHandle), + m_queueHandle(queueHandle) +{} + +EnqueueHandleImpl::~EnqueueHandleImpl() +{} + +}} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h new file mode 100644 index 0000000000..82f3e5c47e --- /dev/null +++ b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file EnqueueHandleImpl.h + */ + +#ifndef qpid_asyncStore_EnqueueHandleImpl_h_ +#define qpid_asyncStore_EnqueueHandleImpl_h_ + +#include "qpid/RefCounted.h" + +namespace qpid { + +namespace broker { +class MessageHandle; +class QueueHandle; +} + +namespace asyncStore { + +class EnqueueHandleImpl : public virtual qpid::RefCounted +{ +public: + EnqueueHandleImpl(qpid::broker::MessageHandle& msgHandle, + qpid::broker::QueueHandle& queueHandle); + virtual ~EnqueueHandleImpl(); +protected: + qpid::broker::MessageHandle& m_msgHandle; + qpid::broker::QueueHandle& m_queueHandle; +}; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_EnqueueHandleImpl_h_ diff --git a/cpp/src/qpid/asyncStore/EventHandleImpl.cpp b/cpp/src/qpid/asyncStore/EventHandleImpl.cpp new file mode 100644 index 0000000000..05b98d1e5d --- /dev/null +++ b/cpp/src/qpid/asyncStore/EventHandleImpl.cpp @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file EventHandleImpl.cpp + */ + +#include "EventHandleImpl.h" + +#include "qpid/messaging/PrivateImplRef.h" + +namespace qpid { +namespace asyncStore { + +EventHandleImpl::EventHandleImpl(qpid::broker::QueueHandle& queueHandle, + const std::string& key) : + m_queueHandle(queueHandle), + m_key(key) +{} + +EventHandleImpl::~EventHandleImpl() +{} + +const std::string& +EventHandleImpl::getKey() const +{ + return m_key; +} + +}} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/EventHandleImpl.h b/cpp/src/qpid/asyncStore/EventHandleImpl.h new file mode 100644 index 0000000000..9ed841b8c0 --- /dev/null +++ b/cpp/src/qpid/asyncStore/EventHandleImpl.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file EventHandleImpl.h + */ + +#ifndef qpid_asyncStore_EventHandleImpl_h_ +#define qpid_asyncStore_EventHandleImpl_h_ + +#include "qpid/RefCounted.h" + +namespace qpid { + +namespace broker { +class QueueHandle; +} + +namespace asyncStore { + +class EventHandleImpl : public virtual qpid::RefCounted +{ +public: + EventHandleImpl(qpid::broker::QueueHandle& queueHandle, + const std::string& key = std::string()); + virtual ~EventHandleImpl(); + const std::string& getKey() const; +protected: + qpid::broker::QueueHandle& m_queueHandle; + std::string m_key; +}; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_EventHandleImpl_h_ diff --git a/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp b/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp new file mode 100644 index 0000000000..cea039221a --- /dev/null +++ b/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MessageHandleImpl.cpp + */ + +#include "MessageHandleImpl.h" + +#include "qpid/messaging/PrivateImplRef.h" + +namespace qpid { +namespace asyncStore { + +MessageHandleImpl::MessageHandleImpl(const qpid::broker::DataSource* dataSrc) : + m_dataSrc(dataSrc) +{} + +MessageHandleImpl::~MessageHandleImpl() +{} + +}} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/MessageHandleImpl.h b/cpp/src/qpid/asyncStore/MessageHandleImpl.h new file mode 100644 index 0000000000..0907ac5e65 --- /dev/null +++ b/cpp/src/qpid/asyncStore/MessageHandleImpl.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MessageHandleImpl.h + */ + +#ifndef qpid_asyncStore_MessageHandleImpl_h_ +#define qpid_asyncStore_MessageHandleImpl_h_ + +#include "qpid/RefCounted.h" + +namespace qpid { + +namespace broker { +class DataSource; +} + +namespace asyncStore { + +class MessageHandleImpl : public virtual qpid::RefCounted +{ +public: + MessageHandleImpl(const qpid::broker::DataSource* dataSrc); + virtual ~MessageHandleImpl(); +protected: + const qpid::broker::DataSource* m_dataSrc; +}; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_MessageHandleImpl_h_ diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp new file mode 100644 index 0000000000..298d6d3061 --- /dev/null +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file OperationQueue.cpp + */ + +#include "OperationQueue.h" + +#include "qpid/broker/BrokerContext.h" + +namespace qpid { +namespace asyncStore { + +OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) : + m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller) +{ + m_opQueue.start(); +} + +OperationQueue::~OperationQueue() +{ + m_opQueue.stop(); +} + +void +OperationQueue::submit(const AsyncOperation* op) +{ +//std::cout << "***** OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; + m_opQueue.push(op); +} + +// protected +OperationQueue::OpQueue::Batch::const_iterator +OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) +{ + for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { +//std::cout << "##### OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; + if ((*i)->m_resCb) { + ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt); + } else { + delete (*i)->m_brokerCtxt; + } + delete (*i); + } + return e.end(); +} + +}} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h new file mode 100644 index 0000000000..8a79684262 --- /dev/null +++ b/cpp/src/qpid/asyncStore/OperationQueue.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file OperationQueue.h + */ + +#ifndef qpid_asyncStore_OperationQueue_h_ +#define qpid_asyncStore_OperationQueue_h_ + +#include "AsyncOperation.h" + +#include "qpid/broker/AsyncStore.h" +#include "qpid/sys/PollableQueue.h" + +namespace qpid { +namespace asyncStore { + +class OperationQueue +{ +public: + OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller); + virtual ~OperationQueue(); + void submit(const AsyncOperation* op); + +protected: + typedef qpid::sys::PollableQueue<const AsyncOperation*> OpQueue; + OpQueue m_opQueue; + + OpQueue::Batch::const_iterator handle(const OpQueue::Batch& e); +}; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_OperationQueue_h_ diff --git a/cpp/src/qpid/asyncStore/Plugin.cpp b/cpp/src/qpid/asyncStore/Plugin.cpp new file mode 100644 index 0000000000..4f35e8cd2a --- /dev/null +++ b/cpp/src/qpid/asyncStore/Plugin.cpp @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Plugin.cpp + */ + +#include "Plugin.h" + +#include "qpid/broker/Broker.h" + +namespace qpid { +namespace broker { + +void +Plugin::earlyInitialize(Target& target) +{ + Broker* broker = dynamic_cast<Broker*>(&target); + if (!broker) return; + DataDir& dataDir = broker->getDataDir (); + if (m_options.m_storeDir.empty ()) + { + if (!dataDir.isEnabled ()) + throw Exception ("asyncStore: If --data-dir is blank or --no-data-dir is specified, --store-dir must be present."); + + m_options.m_storeDir = dataDir.getPath (); + } + m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options)); + boost::shared_ptr<qpid::broker::AsyncStore> brokerAsyncStore(m_store); + broker->setAsyncStore(brokerAsyncStore); + boost::function<void()> fn = boost::bind(&Plugin::finalize, this); + target.addFinalizer(fn); + QPID_LOG(info, "asyncStore: Initialized using path " << m_options.m_storeDir); +} + +void +Plugin::initialize(Target& target) +{ + Broker* broker = dynamic_cast<Broker*>(&target); + if (!broker || !m_store) return; + + // Not done in earlyInitialize as the Broker::isInCluster test won't work there. + if (broker->isInCluster()) { + QPID_LOG(info, "asyncStore: Part of cluster: Disabling management instrumentation"); + } else { + QPID_LOG(info, "asyncStore: Enabling management instrumentation"); + m_store->initManagement(broker); + } +} + +void +Plugin::finalize() +{ + m_store.reset(); +} + +qpid::Options* +Plugin::getOptions() +{ + return &m_options; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/asyncStore/Plugin.h b/cpp/src/qpid/asyncStore/Plugin.h new file mode 100644 index 0000000000..cbb0bfdadb --- /dev/null +++ b/cpp/src/qpid/asyncStore/Plugin.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Plugin.h + */ + +#ifndef qpid_broker_Plugin_h_ +#define qpid_broker_Plugin_h_ + +#include "AsyncStoreImpl.h" +#include "AsyncStoreOptions.h" + +#include "qpid/Plugin.h" + +namespace qpid { +class Options; +namespace broker { + +class Plugin : public qpid::Plugin +{ +public: + virtual void earlyInitialize(Target& target); + virtual void initialize(Target& target); + void finalize(); + virtual qpid::Options* getOptions(); +protected: + boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> m_store; + qpid::asyncStore::AsyncStoreOptions m_options; +}; + +static Plugin instance; // Static initialization. + +}} // namespace qpid::broker + +#endif // qpid_broker_Plugin_h_ diff --git a/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp b/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp new file mode 100644 index 0000000000..523a31ba7b --- /dev/null +++ b/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file QueueHandleImpl.cpp + */ + +#include "QueueHandleImpl.h" + +#include "qpid/messaging/PrivateImplRef.h" + +namespace qpid { +namespace asyncStore { + +QueueHandleImpl::QueueHandleImpl(const std::string& name, + const qpid::types::Variant::Map& opts) : + m_name(name), + m_opts(opts) +{} + +QueueHandleImpl::~QueueHandleImpl() +{} + +const std::string& +QueueHandleImpl::getName() const +{ + return m_name; +} + +}} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/QueueHandleImpl.h b/cpp/src/qpid/asyncStore/QueueHandleImpl.h new file mode 100644 index 0000000000..9046a33877 --- /dev/null +++ b/cpp/src/qpid/asyncStore/QueueHandleImpl.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file QueueHandleImpl.h + */ + +#ifndef qpid_asyncStore_QueueHandleImpl_h_ +#define qpid_asyncStore_QueueHandleImpl_h_ + +#include "qpid/RefCounted.h" +#include "qpid/types/Variant.h" + +#include <string> + +namespace qpid { +namespace asyncStore { + +class QueueHandleImpl : public virtual qpid::RefCounted +{ +public: + QueueHandleImpl(const std::string& name, + const qpid::types::Variant::Map& opts); + virtual ~QueueHandleImpl(); + + const std::string& getName() const; + +protected: + const std::string m_name; + const qpid::types::Variant::Map& m_opts; + +}; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_QueueHandleImpl_h_ diff --git a/cpp/src/qpid/asyncStore/README b/cpp/src/qpid/asyncStore/README new file mode 100644 index 0000000000..4590bfea62 --- /dev/null +++ b/cpp/src/qpid/asyncStore/README @@ -0,0 +1,34 @@ +This folder contains source code for the new async store module. It implements the interface +described in qpid/broker/AsyncStore.h. + +NOTE: This is NOT the current (old) async store module implemented through the sync interface +in qpid/broker/MessageStoere.h. That store will continue to be kept separately at +https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp (or if no checkin credentials, then +read-only at http://anonsvn.jboss.org/repos/rhmessaging/store/trunk/cpp). + +Layout +------ + +qpid + | + +---asyncStore (Interface layer with broker) + | + +---jrnl2 (Async journal) + + +The jrnl2 directory contains the asynchronous journal code, and contains all the journal +file management and low-level writing and reading. This code knows only about writing +opaque data blobs to disk in records which may or may not contain a transaction identifier. +Records may be enqueued (written) or dequeued (which writes to disk the logical deletion of an +earlier enqueued record). + +The upper asyncStore directory contains the implementation of the broker's AsyncStore +interface and the necessary logic to interface this to the jrnl2 code. It also provides +transaction handling and recovery logic. + +Building +-------- +Currently, this module is a Linux-only module, owing to the dependency of the jrnl2 layer +on the Linux-only libaio library. + +Only cmake is supported. diff --git a/cpp/src/qpid/asyncStore/RunState.cpp b/cpp/src/qpid/asyncStore/RunState.cpp new file mode 100644 index 0000000000..3605a0c2e1 --- /dev/null +++ b/cpp/src/qpid/asyncStore/RunState.cpp @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RunState.cpp + */ + +#include "RunState.h" + +#include "qpid/Exception.h" + +#include <sstream> + +namespace qpid { +namespace asyncStore { + +RunState::RunState() : + qpid::asyncStore::jrnl2::State<RunState_t>() +{} + +RunState::RunState(const RunState_t s) : + qpid::asyncStore::jrnl2::State<RunState_t>(s) +{} + +RunState::RunState(const RunState& s) : + qpid::asyncStore::jrnl2::State<RunState_t>(s) +{} + +RunState::~RunState() +{} + +void +RunState::setInitializing() +{ + set(RS_INITIALIZING); +} + +void +RunState::setRestoring() +{ + set(RS_RESTORING); +} + +void +RunState::setRunning() +{ + set(RS_RUNNING); +} + +void +RunState::setStopping() +{ + set(RS_STOPPING); +} + +void +RunState::setStopped() +{ + set(RS_STOPPED); +} + +const char* +RunState::getAsStr() const +{ + return s_toStr(m_state); +} + +//static +const char* +RunState::s_toStr(const RunState_t s) +{ + switch (s) { + case RS_NONE: + return "WR_NONE"; + case RS_INITIALIZING: + return "RS_INITIALIZING"; + case RS_RESTORING: + return "RS_RESTORING"; + case RS_RUNNING: + return "RS_RUNNING"; + case RS_STOPPING: + return "RS_STOPPING"; + case RS_STOPPED: + return "RS_STOPPED"; + default: + std::ostringstream oss; + oss << "<unknown state (" << "s" << ")>"; + return oss.str().c_str(); + } +} + +void +RunState::set(const RunState_t s) +{ + // State transition logic: set stateError to true if an invalid transition is attempted + bool stateTransitionError = false; + switch (m_state) { + case RS_NONE: + if (s != RS_INITIALIZING && s != RS_RESTORING) { + stateTransitionError = true; + } + break; + case RS_INITIALIZING: + case RS_RESTORING: + if (s != RS_RUNNING) { + stateTransitionError = true; + } + break; + case RS_RUNNING: + if (s != RS_STOPPING) { + stateTransitionError = true; + } + break; + case RS_STOPPING: + if (s != RS_STOPPED) { + stateTransitionError = true; + } + break; + case RS_STOPPED: // Cannot move out of this state except with reset() + stateTransitionError = true; + break; + } + + if (stateTransitionError) { + std::ostringstream oss; + oss << "RunState::set() is in state " << m_state << " and cannot be moved to state " << s << "."; + throw qpid::Exception(oss.str()); + } + m_state = s; +} + +}} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/RunState.h b/cpp/src/qpid/asyncStore/RunState.h new file mode 100644 index 0000000000..959e47449d --- /dev/null +++ b/cpp/src/qpid/asyncStore/RunState.h @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RunState.h + */ + +#ifndef qpid_asyncStore_RunState_h_ +#define qpid_asyncStore_RunState_h_ + +#include "qpid/asyncStore/jrnl2/State.h" + +namespace qpid { +namespace asyncStore { + +/** + * RS_NONE + * / \ + * setInitializing() / \ setRestoring() + * / \ + * V V + * RS_INITIALIZING RS_RESTORING + * \ / + * setRunning() \ / setRunning() + * \ / + * V V + * RS_RUNNING + * | + * | setStopping() + * V + * RS_STOPPING + * | + * | setStopped() + * V + * RS_STOPPED + */ +typedef enum { + RS_NONE = 0, + RS_INITIALIZING, + RS_RESTORING, + RS_RUNNING, + RS_STOPPING, + RS_STOPPED +} RunState_t; + +class RunState: public qpid::asyncStore::jrnl2::State<RunState_t> +{ +public: + RunState(); + RunState(const RunState_t s); + RunState(const RunState& s); + virtual ~RunState(); + void setInitializing(); + void setRestoring(); + void setRunning(); + void setStopping(); + void setStopped(); + virtual const char* getAsStr() const; + static const char* s_toStr(const RunState_t s); +protected: + virtual void set(const RunState_t s); + +}; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_RunState_h_ diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp new file mode 100644 index 0000000000..7ce01f881c --- /dev/null +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnHandleImpl.cpp + */ + +#include "TxnHandleImpl.h" + +#include "qpid/messaging/PrivateImplRef.h" + +#include <uuid/uuid.h> + +namespace qpid { +namespace asyncStore { + +TxnHandleImpl::TxnHandleImpl(const std::string& xid) : + m_xid(xid), + m_tpcFlag(!xid.empty()) +{ + if (m_xid.empty()) { // create a local xid from a random uuid + uuid_t uuid; + ::uuid_generate_random(uuid); + char uuidStr[37]; // 36-char uuid + trailing '\0' + ::uuid_unparse(uuid, uuidStr); +// m_xid.assign(uuidStr); + } +} + +TxnHandleImpl::~TxnHandleImpl() +{} + +const std::string& +TxnHandleImpl::getXid() const +{ + return m_xid; +} + +bool +TxnHandleImpl::is2pc() const +{ + return m_tpcFlag; +} + +}} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h new file mode 100644 index 0000000000..d001fd1916 --- /dev/null +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnHandleImpl.h + */ + +#ifndef qpid_asyncStore_TxnHandleImpl_h_ +#define qpid_asyncStore_TxnHandleImpl_h_ + +#include "qpid/RefCounted.h" + +#include <string> + +namespace qpid { +namespace asyncStore { + +class TxnHandleImpl : public virtual qpid::RefCounted +{ +public: + TxnHandleImpl(const std::string& xid = std::string()); + virtual ~TxnHandleImpl(); + const std::string& getXid() const; + bool is2pc() const; +protected: + std::string m_xid; + bool m_tpcFlag; +}; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_TxnHandleImpl_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/AioCallback.h b/cpp/src/qpid/asyncStore/jrnl2/AioCallback.h new file mode 100644 index 0000000000..952448fd72 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/AioCallback.h @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + /** + * \file AioCallback.h + */ + +#ifndef qpid_asyncStore_jrnl2_AioCallback_h_ +#define qpid_asyncStore_jrnl2_AioCallback_h_ + +#include <stdint.h> // uint16_t +#include <vector> + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +class DataToken; + +/** + * \brief This pure virtual class provides an interface through which asyncronous operation completion calls may + * be made. + */ +class AioCallback +{ +public: + /** + * \brief Virtual destructor + */ + virtual ~AioCallback() {} + + /** + * \brief Callback function through which asynchronous IO write operation completion notifications are sent. + */ + virtual void writeAioCompleteCallback(std::vector<DataToken*>& dataTokenList) = 0; + + /** + * \brief Callback function through which asynchronous IO read operation completion notifications are sent. + */ + virtual void readAioCompleteCallback(std::vector<uint16_t>& buffPageCtrlBlkIndexList) = 0; + +}; + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_AioCallback_h_ + diff --git a/cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.cpp b/cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.cpp new file mode 100644 index 0000000000..979b91579f --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.cpp @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncJournal.cpp + */ + +#include "AsyncJournal.h" + +#include "AioCallback.h" +#include "DataToken.h" + +// --- temp code --- +#include <fstream> +#include <iostream> +// --- end temp code --- + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +std::string +g_ioResAsString(const jrnlOpRes /*res*/) +{ + /// \todo TODO - provide implementation + return ".[g_ioResAsString]."; +} + +AsyncJournal::AsyncJournal(const std::string& jrnlId, + const std::string& jrnlDir, + const std::string& baseFileName) : + m_jrnlId(jrnlId), + m_jrnlDir(jrnlDir), + m_baseFileName(baseFileName), + m_jrnlParamsPtr(0), + m_aioCallbackPtr(0) +{ + m_writeDataTokens.reserve(s_listSizeThreshold); + m_callbackDataTokens.reserve(s_listSizeThreshold); +} + +std::string +AsyncJournal::getId() const +{ + return m_jrnlId; +} + +JournalDirectory +AsyncJournal::getJournalDir() const +{ + return m_jrnlDir; +} + +std::string +AsyncJournal::getJournalDirName() const +{ + return m_jrnlDir.getFqName(); +} + +std::string +AsyncJournal::getBaseFileName() const +{ + return m_baseFileName; +} + +const JournalRunState& +AsyncJournal::getState() const +{ + return m_jrnlState; +} + +const JournalParameters* +AsyncJournal::getParameters() const +{ + return m_jrnlParamsPtr; +} + +void +AsyncJournal::initialize(const JournalParameters* jpPtr, + AioCallback* const aiocbPtr) +{ + m_jrnlParamsPtr = jpPtr; + m_aioCallbackPtr = aiocbPtr; + // --- temp code --- + m_jrnlDir.create(); + /// --- end temp code --- +} + +jrnlOpRes +AsyncJournal::enqueue(DataToken* dtokPtr, + const void* /*dataPtr*/, + const std::size_t /*dataLen*/, + const void* /*tidPtr*/, + const std::size_t /*tidLen*/, + const bool /*transientFlag*/) +{ + dtokPtr->getDataOpState().enqueue(); + // --- temp code --- + { // --- START OF CRITICAL SECTION --- + ScopedLock l(m_writeDataTokensLock); + m_writeDataTokens.push_back(dtokPtr); + if (m_writeDataTokens.size() >= s_listSizeThreshold) { + flushNoLock(); + } + } // --- END OF CRITICAL SECTION --- + //processCompletedAioWriteEvents(0); + // --- end temp code --- + return 0; +} + +jrnlOpRes +AsyncJournal::dequeue(DataToken* const dtokPtr, + const void* /*tidPtr*/, + const std::size_t /*tidLen*/) +{ + dtokPtr->getDataOpState().dequeue(); + dtokPtr->setDequeueRecordId(dtokPtr->getRecordId()); + // --- temp code --- + { // --- START OF CRITICAL SECTION --- + ScopedLock l(m_writeDataTokensLock); + m_writeDataTokens.push_back(dtokPtr); + if (m_writeDataTokens.size() >= s_listSizeThreshold) { + flushNoLock(); + } + } // --- END OF CRITICAL SECTION --- + //processCompletedAioWriteEvents(0); + // --- end temp code --- + return 0; +} + +jrnlOpRes +AsyncJournal::commit() +{ + /// \todo TODO - provide implementation + return 0; +} + +jrnlOpRes +AsyncJournal::abort() +{ + /// \todo TODO - provide implementation + return 0; +} + +jrnlOpRes +AsyncJournal::flush() +{ + // --- temp code --- + // --- START OF CRITICAL SECTION --- + ScopedTryLock l(m_writeDataTokensLock); + if (l.isLocked()) { + return flushNoLock(); + } + return 0; + // --- END OF CRITICAL SECTION --- + // --- end temp code --- +} + +// protected +jrnlOpRes +AsyncJournal::flushNoLock() +{ + // --- temp code --- + // Normally the page would be written to disk using libaio here (still to do). + uint32_t cnt = 0UL; + ScopedLock l(m_callbackDataTokensLock); + while (!m_writeDataTokens.empty()) { + m_callbackDataTokens.push_back(m_writeDataTokens.back()); + m_writeDataTokens.pop_back(); + ++cnt; + } + return 0; + // --- end temp code --- +} + +jrnlOpRes +AsyncJournal::sync(const double timeout) +{ + // --- temp code --- + // --- START OF CRITICAL SECTION --- + ScopedTryLock l(m_writeDataTokensLock); + if (l.isLocked()) { + return syncNoLock(timeout); + } + return 0; + // --- END OF CRITICAL SECTION --- + // --- end temp code --- +} + +// protected +jrnlOpRes +AsyncJournal::syncNoLock(const double /*timeout*/) +{ + // --- temp code --- + if (m_callbackDataTokens.size()) { + processCompletedAioWriteEvents(); + } + return 0; + // --- end temp code --- +} + +void +AsyncJournal::processCompletedAioWriteEvents(const double /*timeout*/) +{ + // --- temp code --- + // --- START OF CRITICAL SECTION 1 --- + ScopedLock l1(m_callbackDataTokensLock); + m_aioCallbackPtr->writeAioCompleteCallback(m_callbackDataTokens); + // --- END OF CRITICAL SECTION 1 --- + // --- end temp code --- +} + +}}} // namespace qpid::asyncStore::jrnl2 + + diff --git a/cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.h b/cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.h new file mode 100644 index 0000000000..845d131a49 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/AsyncJournal.h @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncJournal.h + */ + +#ifndef qpid_asyncStore_jrnl2_AsyncJournal_h_ +#define qpid_asyncStore_jrnl2_AsyncJournal_h_ + +#include "JournalDirectory.h" +#include "JournalRunState.h" +#include "ScopedLock.h" + +#include <string> +#include <stdint.h> // uint64_t, uint32_t, etc. + +// --- temp code --- +#include <vector> +// --- end temp code --- + +namespace qpid { ///< Namespace for top-level qpid domain +namespace asyncStore { ///< Namespace for AsyncStore code +namespace jrnl2 { ///< Namespace for AsyncStore journal v.2 code. + +class AioCallback; +class DataToken; +class JournalParameters; + +/** + * \brief Type to return results from journal operations. + * \todo TODO - decide if this is the right place to expose these codes and flags. Also express ioRes as flags. + */ +typedef uint64_t jrnlOpRes; + +const jrnlOpRes RHM_IORES_ENQCAPTHRESH = 0x1; ///< Error flag indicating an enqueue capacity threshold was reached. +const jrnlOpRes RHM_IORES_BUSY = 0x2; ///< Error flag indicating that a call could not be completed because the Store is busy. + +/** + * \brief Global function to convert a return code into a textual representation. + */ +std::string g_ioResAsString(const jrnlOpRes /*res*/); + +/** + * \brief Top level of a single journal instance, which is usually deployed on a per-queue basis. + * + * Each journal has its own set of journal files and when recovered will result in restored data (messages) being + * sent to the queue to which this instance is connected. + */ +class AsyncJournal +{ +public: + /** + * \brief Constructor, which creates a single instance of a journal. + * + * \param jrnlId Journal identifier (JID), typically the name of the queue with which this journal + * is associated. + * \param jrnlDir Absolute path to the directory in which the journal will be placed. If the path does not exist, + * it will be created. + * \param baseFileName The base name of all files in the journal directory. + */ + AsyncJournal(const std::string& jrnlId, + const std::string& jrnlDir, + const std::string& baseFileName); + + // Get functions + + /** + * \brief Get the journal identifier (JID). + * + * \returns Journal identifier (JID) of this journal instance. + */ + std::string getId() const; + + /** + * \brief Get the URI or directory where this journal is deployed (writes its files). + * + * \returns JournalDirectory instance controlling where this journal instance is deployed. + */ + JournalDirectory getJournalDir() const; + + /** + * \brief Get the URI or directory where this journal is deployed (writes its files). + * + * \returns URI or directory (as a std::string) where this journal instance is deployed. + */ + std::string getJournalDirName() const; + + /** + * \brief Get the base file name used for all journal and metadata files associated with this journal instance. + * + * \returns String containing the base file name used for all files associated with this journal instance. + */ + std::string getBaseFileName() const; + + /** + * \brief Get the journal state object, which can be queried for the journal state. + * + * \returns \c const reference to the JournalSate instance associated with this journal instance. + */ + const JournalRunState& getState() const; + + /** + * \brief Return the Journal parameter object which contains the journal options and settings. It may be + * queried directly to obtain the options and settings. + * + * \returns Pointer to the JournalParameters instance associated with this journal instance. + */ + const JournalParameters* getParameters() const; + + // Data ops + + /** + * \brief Initialize the journal and its files, making it ready for use. + * + * \param jpPtr Pointer to an instance of JournalParameters containing the journal options and settings. + * \param aiocbPtr Pointer to an instance of AioCallback, whcih sets the broker AIO callback functions. + * + * \todo TODO: Make this call async for large/slow ops + */ + void initialize(const JournalParameters* jpPtr, + AioCallback* const aiocbPtr); + + /** + * \brief Enqueue data of size dataLen and pointed to by dataPtr. If transactional, then tidPtr points to + * the transaction id and tidLen indicates its size. The DataToken instance must be kept in the service of this + * data record until it has been fully dequeued. + * + * \param dtokPtr Pointer to the DataToken object of a previouisly enqueued data record. + * \param dataPtr Pointer to data to be stored (enqueued). If \b NULL, when parameter \a dataLen > 0, then this + * is assumed to be an externally stored data record. + * \param dataLen Length of the data pointed to + * \param tidPtr Pointer to the transaction ID for this operation. If \b NULL together with \a tidLen, then + * the operation is non-transactional. + * \param tidLen Size of the transaction ID pointed to in parameter \a tidPtr. Must be 0 when \a tidPtr is + * \b NULL to make non-transactional. + * \param transientFlag If \b true, sets a flag indicating that on recover, this record is to be ignored and + * treated as if transient (rather than durable). + * + * \returns Return code for this operation. A zero value (0x0) indicates success, a non-zero value indicates + * an error has occurred or an issue is present. + */ + jrnlOpRes enqueue(DataToken* dtokPtr, + const void* dataPtr, // if null and dataLen > 0, extern assumed + const std::size_t dataLen, + const void* tidPtr, // if null and tidLen == 0, non transactional + const std::size_t tidLen, + const bool transientFlag); + + /** + * \brief Dequeue the data record previously enqueued using the DataToken object dtokPtr. + * + * \param dtokPtr Pointer to the DataToken object of a previouisly enqueued data record. + * \param tidPtr Pointer to the transaction ID for this operation. If \b NULL together with \a tidLen, then + * the operation is non-transactional. + * \param tidLen Size of the transaction ID pointed to in parameter \a tidPtr. Must be 0 when \a tidPtr is + * \b NULL to make non-transactional. + * \returns Return code for this operation. A zero value (0x0) indicates success, a non-zero value indicates + * an error has occurred or an issue is present. + */ + jrnlOpRes dequeue(DataToken* const dtokPtr, + const void* tidPtr, // if null and tidLen == 0, non transactional + const std::size_t tidLen); + + /** + * \brief Commit the transaction Id (XID) used for previoius enqueue(s) and/or dequeue(s). + * + * \return Return code for this operation. A zero value (0x0) indicates success, a non-zero value indicates + * an error has occurred or an issue is present. + * + * \todo TODO: Create and add an XID type as a parameter to this call. + */ + jrnlOpRes commit(); + + /** + * \brief Abort (roll back) the transaction Id (XID) used for previoius enqueue(s) and/or dequeue(s). + * + * \return Return code for this operation. A zero value (0x0) indicates success, a non-zero value indicates + * an error has occurred or an issue is present. + * + * \todo TODO: Create and add an XID type as a parameter to this call. + */ + jrnlOpRes abort(); + + // AIO ops and status + + /** + * \brief Flush all unwritten buffered records for this journal. + */ + jrnlOpRes flush(); + + /** + * \brief Wait until all AIOs outstanding at the last flush() have returned for this journal. + * + * It is assumed that flush() will have been previously called. This call by definition blocks until \a timeout + * has elapsed or all AIO operations outstanding at the last flush() call have returned. A zero timeout implies + * an indefinite block. + */ + jrnlOpRes sync(const double timeout = 0.0); + + /** + * \brief Search for and process completed AIO write events. + * + * \param timeout Maximum time to wait for completion, otherwise return without performing any work. + * + * \todo TODO: This may become obsolete if epoll is used instead. + * \todo TODO: Should this return the number of completed AIO events? + */ + void processCompletedAioWriteEvents(const double timeout = 0.0); + +protected: + std::string m_jrnlId; ///< Identifier for this journal instance (JID), typically queue name. + JournalDirectory m_jrnlDir; ///< Directory in which this journal is deployed. + std::string m_baseFileName; ///< Base file name used for all journal files belonging to this instance. + JournalRunState m_jrnlState; ///< Journal state manager, controls the state of this journal. + const JournalParameters* m_jrnlParamsPtr; ///< Journal options and parameters associated with this journal. + AioCallback* m_aioCallbackPtr; ///< Pointers to the broker's callback functions for AIO completion callbacks. + + // --- temp code --- + static const uint32_t s_listSizeThreshold = 250; ///< [TEMP CODE] Number of data records at which a flush will occur. + std::vector<DataToken*> m_writeDataTokens; ///< [TEMP CODE] List of data tokens held before a flush. + std::vector<DataToken*> m_callbackDataTokens; ///< [TEMP CODE] List of data tokens ready for callbacks. + ScopedMutex m_writeDataTokensLock; ///< [TEMP CODE] Lock to protect the write token list. + ScopedMutex m_callbackDataTokensLock; ///< [TEMP CODE] Lock to protect the callback token list. + // --- end temp code --- + + /** + * \brief Internal-use flush call which operates without taking a lock. + */ + jrnlOpRes flushNoLock(); + + /** + * \brief Internal-use sync call which operates without taking a lock. + */ + jrnlOpRes syncNoLock(const double timeout); + +}; + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_AsyncJournal_h_ + diff --git a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h new file mode 100644 index 0000000000..b13caa5462 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AtomicCounter.h + */ + +#ifndef qpid_asyncStore_jrnl2_AtomicCounter_h_ +#define qpid_asyncStore_jrnl2_AtomicCounter_h_ + +#include "ScopedLock.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +/** + * \brief Template integral atomic counter class which provides a next() operation to increment then return the + * updated count of integral type T. This operation is performed under a thread lock. + */ +template <class T> class AtomicCounter +{ +public: + /** + * \brief Constructor with an option to set an inital value for the counter. + */ + AtomicCounter(T initialValue = T(0)) : + m_cnt(initialValue) + {} + + /** + * \brief Destructor + */ + virtual ~AtomicCounter() + {} + + /** + * \brief Increment, then return new value of the internal counter. This is thread-safe and takes a lock + * in order to perform the increment. + * + * \note This is a non-zero counter. By default, the counter starts with the value 0, and consequently the + * first call to next() will return 1. Upon overflow, the counter will be incremented twice so as to avoid + * returning the value 0. + */ + virtual T next() + { + // --- START OF CRITICAL SECTION --- + ScopedLock l(m_mutex); + while (!++m_cnt) ; // Cannot return 0x0 if m_cnt should overflow + return m_cnt; + } // --- END OF CRITICAL SECTION --- + +protected: + T m_cnt; ///< Internal count value + ScopedMutex m_mutex; ///< Internal lock used to increment the counter + +}; + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_AtomicCounter_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/Configuration.h b/cpp/src/qpid/asyncStore/jrnl2/Configuration.h new file mode 100644 index 0000000000..5e5aa176dd --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/Configuration.h @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Configuration.h + */ + +#ifndef qpid_asyncStore_jrnl2_Configuration_h_ +#define qpid_asyncStore_jrnl2_Configuration_h_ + +#include <stdint.h> // uint8_t + +#if defined(__i386__) /* little endian, 32 bits */ + #define JRNL_LITTLE_ENDIAN +// #define JRNL_32_BIT +#elif defined(__PPC__) || defined(__s390__) /* big endian, 32 bits */ + #define JRNL_BIG_ENDIAN +// #define JRNL_32_BIT +#elif defined(__ia64__) || defined(__x86_64__) || defined(__alpha__) /* little endian, 64 bits */ + #define JRNL_LITTLE_ENDIAN +// #define JRNL_64_BIT +#elif defined(__powerpc64__) || defined(__s390x__) /* big endian, 64 bits */ + #define JRNL_BIG_ENDIAN +// #define JRNL_64_BIT +#else + #error Unable to determine endianness +#endif + + +/** +* <b>Rule:</b> Data block size (JRNL_DBLK_SIZE) MUST be a power of 2 such that +* <pre> +* JRNL_DBLK_SIZE * JRNL_SBLK_SIZE == n * 512 (n = 1,2,3...) +* </pre> +* (The disk softblock size is 512 for Linux kernels >= 2.6) +*/ +//#define JRNL_DBLK_SIZE 128 ///< Data block size in bytes (CANNOT BE LESS THAN 32!) +//#define JRNL_SBLK_SIZE 4 ///< Disk softblock size in multiples of JRNL_DBLK_SIZE +//#define JRNL_MIN_FILE_SIZE 128 ///< Min. jrnl file size in sblks (excl. FileHeader) +//#define JRNL_MAX_FILE_SIZE 4194304 ///< Max. jrnl file size in sblks (excl. FileHeader) +//#define JRNL_MIN_NUM_FILES 4 ///< Min. number of journal files +//#define JRNL_MAX_NUM_FILES 64 ///< Max. number of journal files +//#define JRNL_ENQ_THRESHOLD 80 ///< Percent full when enqueue connection will be closed +// +//#define JRNL_RMGR_PAGE_SIZE 128 ///< Journal page size in softblocks +//#define JRNL_RMGR_PAGES 16 ///< Number of pages to use in wmgr +// +//#define JRNL_WMGR_DEF_PAGE_SIZE 64 ///< Journal write page size in softblocks (default) +//#define JRNL_WMGR_DEF_PAGES 32 ///< Number of pages to use in wmgr (default) +// +//#define JRNL_WMGR_MAXDTOKPP 1024 ///< Max. dtoks (data blocks) per page in wmgr +//#define JRNL_WMGR_MAXWAITUS 100 ///< Max. wait time (us) before submitting AIO +// +//#define JRNL_INFO_EXTENSION "jinf" ///< Extension for journal info files +//#define JRNL_DATA_EXTENSION "jdat" ///< Extension for journal data files +//#define RHM_JDAT_TXA_MAGIC 0x614d4852 ///< ("RHMa" in little endian) Magic for dtx abort hdrs +//#define RHM_JDAT_TXC_MAGIC 0x634d4852 ///< ("RHMc" in little endian) Magic for dtx commit hdrs +//#define RHM_JDAT_DEQ_MAGIC 0x644d4852 ///< ("RHMd" in little endian) Magic for deq rec hdrs +//#define RHM_JDAT_ENQ_MAGIC 0x654d4852 ///< ("RHMe" in little endian) Magic for enq rec hdrs +//#define RHM_JDAT_FILE_MAGIC 0x664d4852 ///< ("RHMf" in little endian) Magic for file hdrs +//#define RHM_JDAT_EMPTY_MAGIC 0x784d4852 ///< ("RHMx" in little endian) Magic for empty dblk +//#define RHM_JDAT_VERSION 0x01 ///< Version (of file layout) +//#define RHM_CLEAN_CHAR 0xff ///< Char used to clear empty space on disk + +#define RHM_LENDIAN_FLAG 0x0 ///< Value of little endian flag on disk +#define RHM_BENDIAN_FLAG 0x1 ///< Value of big endian flag on disk + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +/** + * \brief Class aggregating the journal and store hard-coded configuration values. + */ +class Configuration +{ +public: + static const uint8_t s_endianValue = +#if defined(JRNL_LITTLE_ENDIAN) + RHM_LENDIAN_FLAG; +#elif defined(JRNL_BIG_ENDIAN) + RHM_BENDIAN_FLAG; +#else +# error Unknown endianness +#endif +}; + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_Configuration_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/DataOpState.cpp b/cpp/src/qpid/asyncStore/jrnl2/DataOpState.cpp new file mode 100644 index 0000000000..79d1133cf6 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/DataOpState.cpp @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file DataOpState.cpp + */ + +#include "DataOpState.h" + +#include "JournalError.h" + +#include <sstream> // std::ostringstream + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +DataOpState::DataOpState() : + State<opState_t>() +{} + +DataOpState::DataOpState(const opState_t s) : + State<opState_t>(s) +{} + +DataOpState::DataOpState(const DataOpState& s) : + State<opState_t>(s) +{} + +DataOpState::~DataOpState() +{} + +void +DataOpState::enqueue() +{ + set(OP_ENQUEUE); +} + +void +DataOpState::dequeue() +{ + set(OP_DEQUEUE); +} + +const char* +DataOpState::getAsStr() const +{ + return s_toStr(m_state); +} + +// static +const char* +DataOpState::s_toStr(const opState_t s) +{ + switch (s) { + case OP_NONE: + return "OP_NONE"; + case OP_ENQUEUE: + return "OP_ENQUEUE"; + case OP_DEQUEUE: + return "OP_DEQUEUE"; + default: + std::ostringstream oss; + oss << "<unknown state (" << "s" << ")>"; + return oss.str().c_str(); + } +} + +// protected +void +DataOpState::set(const opState_t s) +{ + // State transition logic: set stateError to true if an invalid transition is attempted + bool stateTransitionError = false; + switch(m_state) { + case OP_NONE: + if (s != OP_ENQUEUE) stateTransitionError = true; + break; + case OP_ENQUEUE: + if (s != OP_DEQUEUE) stateTransitionError = true; + break; + case OP_DEQUEUE: // Cannot move out of this state except with reset() + stateTransitionError = true; + break; + } + if (stateTransitionError) { + THROW_STATE_EXCEPTION(JournalError::JERR_MSGOPSTATE, s_toStr(s), s_toStr(m_state), "DataOpState", "set"); + } + m_state = s; +} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/DataOpState.h b/cpp/src/qpid/asyncStore/jrnl2/DataOpState.h new file mode 100644 index 0000000000..492f0b3f6b --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/DataOpState.h @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file DataOpState.h + */ + +#ifndef qpid_asyncStore_jrnl2_DataOpState_h_ +#define qpid_asyncStore_jrnl2_DataOpState_h_ + +#include "State.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +/** + * \brief Enumeration of valid data record enqueue/dequeue states. + */ +typedef enum { + OP_NONE = 0, ///< Data record has not been processed by async store + OP_ENQUEUE, ///< Data record has been enqueued + OP_DEQUEUE ///< Data record has been dequeued +} opState_t; + +/** + * \brief State machine to control the data record operational state (ie none -> enqueued -> dequeued). + * + * The following state diagram shows valid data record operational state transitions: + * \dot + * digraph DataOpState { + * node [fontname=Helvetica, fontsize=8]; + * OP_NONE [URL="\ref OP_NONE"] + * OP_ENQUEUE [URL="\ref OP_ENQUEUE"] + * OP_DEQUEUE [URL="\ref OP_DEQUEUE"] + * + * edge [fontname=Helvetica, fontsize=8] + * OP_NONE->OP_ENQUEUE [label=" enqueue()" URL="\ref qpid::jrnl2::DataOpState::enqueue()"] + * OP_ENQUEUE->OP_DEQUEUE [label=" dequeue()" URL="\ref qpid::jrnl2::DataOpState::enqueue()"] + * } + * \enddot + */ +class DataOpState: public State<opState_t> +{ +public: + /** + * \brief Default constructor, setting internal state to OP_NONE. + */ + DataOpState(); + + /** + * \brief Constructor allowing the internal state to be preset to any valid value of opState_t. + * + * \param s State value to which the internal state of this new instance should be initialized. + */ + DataOpState(const opState_t s); + + /** + * \brief Copy constructor. + * + * \param s Instance from which this new instance state should be copied. + */ + DataOpState(const DataOpState& s); + + /** + * \brief Virtual destructor + */ + virtual ~DataOpState(); + + /** + * \brief Changes the data record operational state from OP_NONE to OP_ENQUEUE. + * + * \throws JournalException with JERR_ILLEGALDTOKOPSTATECHANGE if current state makes it illegal to change + * to OP_ENQUEUE according to the state machine semantics. + */ + void enqueue(); + + /** + * \brief Changes the data record operational state from OP_ENQUEUE to OP_DEQUEUE. + * + * \throws JournalException with JERR_ILLEGALDTOKOPSTATECHANGE if current state makes it illegal to change + * to OP_DEQUEUE according to the state machine semantics. + */ + void dequeue(); + + /** + * \brief Get the current state in a string form. + * + * \returns String representation of internal state m_state as returned by static function s_toStr(). + */ + const char* getAsStr() const; + + /** + * \brief Static helper function to convert a numeric opState_t type to a string representation. + */ + static const char* s_toStr(const opState_t s); + +protected: + /** + * \brief Set (or change) the value of m_state. This function implements the state machine checks for + * legal state change transitions, and throw an exception if an illegal state transition is requested. + * + * \param s State to which this machine should be changed. + * \throws JournalException with JERR_ILLEGALDTOKOPSTATECHANGE if current state makes it illegal to change + * to the requested state \a s. + */ + void set(const opState_t s); + +}; + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_DataOpState_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/DataToken.cpp b/cpp/src/qpid/asyncStore/jrnl2/DataToken.cpp new file mode 100644 index 0000000000..cd4969a7fc --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/DataToken.cpp @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file DataToken.cpp + */ + +#include "DataToken.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +DataToken::DataToken() : + m_dataOpState(), + m_recordId(s_recordIdCounter.next()), + m_externalRecordIdFlag(false) +{} + +DataToken::DataToken(const recordId_t rid) : + m_dataOpState(), + m_recordId(rid), + m_externalRecordIdFlag(true) +{} + +DataToken::~DataToken() {} + +const DataOpState& +DataToken::getDataOpState() const +{ + return m_dataOpState; +} + +DataOpState& +DataToken::getDataOpState() +{ + return m_dataOpState; +} + +const DataWrComplState& +DataToken::getDataWrComplState() const +{ + return m_dataWrComplState; +} + +DataWrComplState& +DataToken::getDataWrComplState() +{ + return m_dataWrComplState; +} + +bool +DataToken::isTransient() const +{ + return m_transientFlag; +} + +bool +DataToken::isExternal() const +{ + return m_externalFlag; +} + +const std::string& +DataToken::getExternalLocation() const +{ + return m_externalLocation; +} + +recordId_t +DataToken::getRecordId() const +{ + return m_recordId; +} + +bool +DataToken::isRecordIdExternal() const +{ + return m_externalRecordIdFlag; +} + +recordId_t +DataToken::getDequeueRecordId() const +{ + return m_dequeueRecordId; +} + +void +DataToken::setRecordId(const recordId_t rid) +{ + m_recordId = rid; + m_externalRecordIdFlag = true; +} + +void +DataToken::setDequeueRecordId(const recordId_t drid) +{ + m_dequeueRecordId = drid; +} + +void +DataToken::toStream(std::ostream& os) const +{ + /// \todo TODO: Implementation required + os << "status string"; +} + +// private static +RecordIdCounter_t DataToken::s_recordIdCounter; + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/DataToken.h b/cpp/src/qpid/asyncStore/jrnl2/DataToken.h new file mode 100644 index 0000000000..2058cc75ad --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/DataToken.h @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file DataToken.h + */ + +#ifndef qpid_jrnl2_asyncStore_DataToken_h_ +#define qpid_jrnl2_asyncStore_DataToken_h_ + +#include "DataOpState.h" +#include "DataWrComplState.h" +#include "RecordIdCounter.h" + +#include <string> +#include <stdint.h> // uint64_t + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +/** + * \brief Contains the state and metadata for each record handled by the async store. + * + * The DataToken keeps the state of the data record through the AIO write and recovery process. It may also + * be used to set certain properties of the data record. + */ +class DataToken : public Streamable +{ +public: + /** + * \brief Default constructor, which internally assigns a unique record Id (RID). + */ + DataToken(); + + /** + * \brief Constructor which accepts an external record id (RID). No check is made of the uniqueness of this RID. + * + * \param rid The externally assigned record Id, which must be unique across this journal instance within the + * window presented by the journal files. + */ + DataToken(const recordId_t rid); + + /** + * \brief Virtual destructor + */ + virtual ~DataToken(); + + // Get functions + + /** + * \brief Get the const DataOpState state manager object for this data token. + * + * \return \c const state of the data record associated with this DataToken instance. + */ + const DataOpState& getDataOpState() const; + + /** + * \brief Get the DataOpState state manager object for this data token. + * + * \return Non -\c const (modifiable) state of the data record associated with this DataToken instance. + */ + DataOpState& getDataOpState(); + + /** + * \brief Get the const DataWrComplState state manager object for this data token. + * + * \return \c const state of the data record associated with this DataToken instance. + */ + const DataWrComplState& getDataWrComplState() const; + + /** + * \brief Get the DataWrComplState state manager object for this data token. + * + * \return Non -\c const (modifiable) state of the data record associated with this DataToken instance. + */ + DataWrComplState& getDataWrComplState(); + + /** + * \brief Check if this data is marked as transient (will not be recovered). + * + * \return \c true if the data is transient, \c false otherwise. + */ + bool isTransient() const; + + /** + * \brief Check if this data is marked as external (i.e. has its content stored outside the store journal). + * + * \return \c true if the data is external (has its content stored outside the journal), \c false otherwise. + */ + bool isExternal() const; + + /** + * \brief Return the external storage location of the data if it is marked as external. This may be a path + * or URL. + * + * \return Location or URI pointing to the location of the storage for this data record if it is marked external. + */ + const std::string& getExternalLocation() const; + + /** + * \brief Return the Record Id (RID) for this data record. + * + * \return Record Id (RID) for this data record. + */ + recordId_t getRecordId() const; + + /** + * \brief Checks if the Record Id assigned to this data record was supplied externally. + * + * \return \c true if the data record had its Record Id (RID) assigned externally from the store, \c false + * otherwise. + */ + bool isRecordIdExternal() const; + + /** + * \brief Returns the recordId of the dequeuing record (if dequeued) + * + * \return The Record Id of the dequeue record which dequeues this record, or 0x0 if it has not been dequeued + * (or cannot be dequeued). + */ + recordId_t getDequeueRecordId() const; + + // Set functions + + /** + * \brief Set the record Id (RID) for this record. No check is made of the uniqueness of this RID. + */ + void setRecordId(const recordId_t rid); + + /** + * \brief Set the record Id (RID) of the record which dequeues this record. + */ + void setDequeueRecordId(const recordId_t drid); + + // Debug aid(s) + + /** + * \brief Stream a formatted summary of the state of this DataToken object. + */ + virtual void toStream(std::ostream& os) const; + +protected: + DataOpState m_dataOpState; ///< Data operational state (none, enqueued, dequeued) + DataWrComplState m_dataWrComplState; ///< Data write completion state (none, part, complete) + bool m_transientFlag; ///< True if the data record is transient (eg for Flow-To-Disk) + bool m_externalFlag; ///< True if the data record is stored externally from the store + std::string m_externalLocation; ///< Location of data record when stored externally + recordId_t m_recordId; ///< Record Id which is unique to this store instance + bool m_externalRecordIdFlag; ///< True if the record id was set through this token + recordId_t m_dequeueRecordId; ///< Record Id of the dequeue record for this data record + +private: + static RecordIdCounter_t s_recordIdCounter; ///< Static instance keeps record Ids unique across system + +}; + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_jrnl2_asyncStore_DataToken_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.cpp b/cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.cpp new file mode 100644 index 0000000000..c6cedc4782 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.cpp @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file DataWrComplState.cpp + */ + +#include "DataWrComplState.h" + +#include "JournalError.h" + +#include <sstream> // std::ostringstream + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +DataWrComplState::DataWrComplState() : + State<wrComplState_t>() +{} + +DataWrComplState::DataWrComplState(const wrComplState_t s) : + State<wrComplState_t>(s) +{} + +DataWrComplState::DataWrComplState(const DataWrComplState& s) : + State<wrComplState_t>(s) +{} + +DataWrComplState::~DataWrComplState() +{} + +void +DataWrComplState::complete() +{ + set(WR_COMPLETE); +} + +void +DataWrComplState::partComplete() +{ + set(WR_PART); +} + +const char* +DataWrComplState::getAsStr() const +{ + return s_toStr(m_state); +} + +// static +const char* +DataWrComplState::s_toStr(const wrComplState_t s) +{ + switch (s) { + case WR_NONE: + return "WR_NONE"; + case WR_PART: + return "WR_PART"; + case WR_COMPLETE: + return "WR_COMPLETE"; + default: + std::ostringstream oss; + oss << "<unknown state (" << "s" << ")>"; + return oss.str().c_str(); + } +} + +// protected +void +DataWrComplState::set(const wrComplState_t s) +{ + // State transition logic: set stateError to true if an invalid transition is attempted + bool stateTransitionError = false; + switch(m_state) { + case WR_NONE: + if (s != WR_PART && s != WR_COMPLETE) stateTransitionError = true; + break; + case WR_PART: + if (s != WR_COMPLETE) stateTransitionError = true; + break; + case WR_COMPLETE: // Cannot move out of this state except with reset() + stateTransitionError = true; + break; + } + if (stateTransitionError) { + THROW_STATE_EXCEPTION(JournalError::JERR_MSGWRCMPLSTATE, s_toStr(s), s_toStr(m_state), "DataWrComplState", + "set"); + } + m_state = s; +} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.h b/cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.h new file mode 100644 index 0000000000..691b9c7b82 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/DataWrComplState.h @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file DataWrComplState.h + */ + +#ifndef qpid_asyncStore_jrnl2_DataWrComplState_h_ +#define qpid_asyncStore_jrnl2_DataWrComplState_h_ + +#include "State.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +/** + * \brief Enumeration of valid data record write completion states. + */ +typedef enum { + WR_NONE = 0, ///< Data record has not been written into async store buffer + WR_PART, ///< Data record has been partly written to store buffer + WR_COMPLETE ///< Data record write to store buffer is complete +} wrComplState_t; + +/** + * \brief State machine for the data record write completion state - ie whether the data record has been written + * to the store buffer. + * + * The following state diagram shows valid write buffer states and transitions: + * \dot + * digraph wrComplState_t { + * node [fontname=Helvetica, fontsize=8]; + * WR_NONE [URL="\ref WR_NONE"] + * WR_PART [URL="\ref WR_PART"] + * WR_COMPLETE [URL="\ref WR_COMPLETE"] + * + * edge [fontname=Helvetica, fontsize=8] + * WR_NONE->WR_PART [label=" partComplete()" URL="\ref qpid::jrnl2::DataWrComplState::partComplete()"] + * WR_NONE->WR_COMPLETE [label=" complete()" URL="\ref qpid::jrnl2::DataWrComplState::complete()"] + * WR_PART->WR_COMPLETE [label=" complete()" URL="\ref qpid::jrnl2::DataWrComplState::complete()"] + * } + * \enddot + */ +class DataWrComplState: public State<wrComplState_t> { +public: + /** + * \brief Default constructor, setting internal state to WR_NONE. + */ + DataWrComplState(); + + /** + * \brief Constructor allowing the internal state to be preset to any valid value of wrComplState_t. + * + * \param s State value to which the internal state of this new instance should be initialized. + */ + DataWrComplState(const wrComplState_t s); + + /** + * \brief Copy constructor. + * + * \param s Instance from which this new instance state should be copied. + */ + DataWrComplState(const DataWrComplState& s); + + /** + * \brief Virtual destructor + */ + virtual ~DataWrComplState(); + + /** + * \brief Changes the data record operational state from WR_NONE or WR_PART to WR_COMPLETE. + * + * \throws JournalException with JERR_ILLEGALDTOKOPSTATECHANGE if current state makes it illegal to change + * to WR_COMPLETE according to the state machine semantics. + */ + void complete(); + + /** + * \brief Changes the data record operational state from WR_NONE to WR_PART. + * + * \throws JournalException with JERR_ILLEGALDTOKOPSTATECHANGE if current state makes it illegal to change + * to WR_PART according to the state machine semantics. + */ + void partComplete(); + + /** + * \brief Get the current state in a string form. + * + * \returns String representation of internal state m_state as returned by static function s_toStr(). + */ + const char* getAsStr() const; + + /** + * \brief Static helper function to convert a numeric wrComplState_t type to a string representation. + */ + static const char* s_toStr(const wrComplState_t s); + +protected: + /** + * \brief Set (or change) the value of m_state. This function implements the state machine checks for + * legal state change transitions, and throw an exception if an illegal state transition is requested. + * + * \param s State to which this machine should be changed. + * \throws JournalException with JERR_ILLEGALDTOKOPSTATECHANGE if current state makes it illegal to change + * to the requested state \a s. + */ + void set(const wrComplState_t s); + +}; + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_DataWrComplState_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.cpp b/cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.cpp new file mode 100644 index 0000000000..bf8ef5fc7f --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.cpp @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file DequeueHeader.cpp + */ + +#include "DequeueHeader.h" + +#include "RecordTail.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +DequeueHeader::DequeueHeader() : + RecordHeader(), + m_dequeuedRecordId(0), + m_xidSize(0) +{} + +DequeueHeader::DequeueHeader(const uint32_t magic, + const uint8_t version, + const uint64_t recordId, + const uint64_t dequeuedRecordId, + const uint64_t xidSize, + const bool overwriteIndicator, + const bool tplCommitOnTxnComplFlag) : + RecordHeader(magic, version, recordId, overwriteIndicator), + m_dequeuedRecordId(dequeuedRecordId), + m_xidSize(xidSize) +{ + setTplCommitOnTxnComplFlag(tplCommitOnTxnComplFlag); +} + +DequeueHeader::DequeueHeader(const DequeueHeader& dh) : + RecordHeader(dh), + m_dequeuedRecordId(dh.m_dequeuedRecordId), + m_xidSize(dh.m_xidSize) +{} + +DequeueHeader::~DequeueHeader() +{} + +void +DequeueHeader::copy(const DequeueHeader& dh) +{ + RecordHeader::copy(dh); + m_dequeuedRecordId = dh.m_dequeuedRecordId; + m_xidSize = dh.m_xidSize; +} + +void +DequeueHeader::reset() +{ + RecordHeader::reset(); + m_dequeuedRecordId = 0; + m_xidSize = 0; +} + +bool +DequeueHeader::getTplCommitOnTxnComplFlag() const +{ + return m_flags & DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK; +} + +void +DequeueHeader::setTplCommitOnTxnComplFlag(const bool commitOnTxnCompl) +{ + m_flags = commitOnTxnCompl ? + m_flags | DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK : + m_flags & (~DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK); +} + +//static +uint64_t +DequeueHeader::getHeaderSize() +{ + return static_cast<uint64_t>(sizeof(DequeueHeader)); +} + +uint64_t +DequeueHeader::getBodySize() const +{ + return m_xidSize; +} + +uint64_t +DequeueHeader::getRecordSize() const +{ + return getHeaderSize() + (getBodySize() > 0LL ? + getBodySize() + RecordTail::getSize() : + 0); +} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.h b/cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.h new file mode 100644 index 0000000000..deb23915f7 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/DequeueHeader.h @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file DequeueHeader.h + * + * List of journal record structs: + * struct DequeueHeader <-- This file + * struct EnqueueHeader + * struct EventHeader + * struct FileHeader + * struct RecordHeader + * struct RecordTail + * struct TransactionHeader + * + * Overview of journal record structs: + * + * <pre> + * +------------+ +--------------+ + * | RecordTail | | RecordHeader | + * +------------+ | (abstract) | + * +--------------+ + * ^ + * | + * +----------------+---------+-------+-------------------+ + * | | | | + * +------------+ +-------------+ +---------------+ +-------------------+ + * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader | + * +------------+ +-------------+ +---------------+ +-------------------+ + * ^ + * | + * +---------------+ + * | EnqueueHeader | + * +---------------+ + * </pre> + */ + +#ifndef qpid_asyncStore_jrnl2_DequeueHeader_h_ +#define qpid_asyncStore_jrnl2_DequeueHeader_h_ + +#include "RecordHeader.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +#pragma pack(1) + +/** + * \brief Struct for dequeue record header. This record marks a previous enqueue as + * logically deleted. + * + * The dequeue record conceptually marks a previous enqueue record as dequeued, + * or logically deleted. This is achieved by recording the record ID (rid) of + * the enqueue record being deleted in the m_dequeueRecordId field. Note that this + * rid is distinct from the rid assigned to the dequeue record itself in m_recordId. + * + * Dequeue records do not carry data. However, if it is transactional (and thus has + * a transaction ID (xid), then this record will be terminated by a RecordTail struct. + * If, on the other hand, this record is non-transactional, then the rec_tail + * is absent. + * + * Note that this record had its own rid distinct from the rid of the record it is dequeuing. + * The rid field below is the rid of the dequeue record itself; the deq-rid field is the rid of a + * previous enqueue record being dequeued by this record. + * + * Record layout in binary format (32 bytes): + * <pre> + * 0x0 0x7 + * +-----+-----+-----+-----+-----+-----+-----+-----+ -+ + * 0x00 | m_magic | v | e | m_flags | | + * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader + * 0x08 | m_recordId | | + * +-----+-----+-----+-----+-----+-----+-----+-----+ -+ + * 0x10 | m_dequeuedRecordId | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * 0x18 | m_xidSize | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * </pre> + * <table> + * <tr> + * <td>v</td> + * <td>file version [ <code>_version</code> ] (If the format or encoding of + * this file changes, then this number should be incremented)</td> + * </tr> + * <tr> + * <td>e</td> + * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for + * little endian, <b>true</b> (0x01) for big endian</td> + * </tr> + * </table> + * <pre> + * Non-transactional: + * +---------+ + * | dequeue | + * | header | + * +---------+ + * <-- 32 ---> + * + * Transactional: + * +---------+------------------------+--------+ + * | dequeue | XID | record | + * | header | | tail | + * +---------+------------------------+--------+ + * <-- 32 ---><----- m_xidSize ------><-- 16 --> + * </pre> + */ +class DequeueHeader : public RecordHeader +{ +public: + uint64_t m_dequeuedRecordId; ///< Record ID of dequeued record + uint64_t m_xidSize; ///< XID size + + /** + * \brief Mask for the record header flags field m_flags which is used in dequeue records in the Transaction + * Prepared List (TPL) to indicate that a closed transaction resulted in a commit (if the flag is set) + * or an abort (if the flag is not set). + */ + static const uint16_t DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK = 0x10; + + /** + * \brief Default constructor, which sets all values to 0. + */ + DequeueHeader(); + + /** + * \brief Convenience constructor which initializes values during construction. + * + * \param magic The magic for this record + * \param version Version of this record + * \param recordId Record identifier for this record + * \param dequeuedRecordId Record identifier of the record being dequeued by this record + * \param xidSize Size of the transaction (or distributed transaction) ID for this record + * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this + * record + * \param tplCommitOnTxnComplFlag + */ + DequeueHeader(const uint32_t magic, + const uint8_t version, + const uint64_t recordId, + const uint64_t dequeuedRecordId, + const uint64_t xidSize, + const bool overwriteIndicator, + const bool tplCommitOnTxnComplFlag = false); + + /** + * \brief Copy constructor + * + * \param dh Instance to be copied + */ + DequeueHeader(const DequeueHeader& dh); + + /** + * \brief Virtual destructor + */ + virtual ~DequeueHeader(); + + /** + * \brief Convenience copy method. + */ + void copy(const DequeueHeader& dh); + + /** + * \brief Reset this record to default values (mostly 0) + */ + void reset(); + + /** + * \brief Return the value of the tplCommitOnTxnComplFlag for this record. This flag is used only within the + * TPL, and if set, indicates that the transaction was closed using a commit. If not set, the transaction was + * closed using an abort. This is used during recovery of the transactions in the store. + * + * \returns \b true if the tplCommitOnTxnComplFlag flag for this record is set, \b false otherwise. + */ + bool getTplCommitOnTxnComplFlag() const; + + /** + * \brief Set the value of the tplCommitOnTxnComplFlag for this record. This is only used in the TPL, and is + * ignored elsewhere. + * + * \param commitOnTxnCompl The value to be set in the tplCommitOnTxnComplFlag. If \b true, the transaction was + * closed with a commit; if \b false, with an abort. + */ + void setTplCommitOnTxnComplFlag(const bool commitOnTxnCompl); + + /** + * \brief Return the header size of this record in bytes. + * + * \returns Size of record header in bytes. + */ + static uint64_t getHeaderSize(); + + /** + * \brief Return the body (xid and data) size of this record in bytes. + * + * \returns Size of record body in bytes. + */ + uint64_t getBodySize() const; + + /** + * \brief Return total size of this record in bytes, being in the case of the dequeue record the size of the + * header, the size of the body (xid only) and the size of the tail. + * + * \returns Total size of record in bytes. + */ + uint64_t getRecordSize() const; + +}; + +#pragma pack() + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_DequeueHeader_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.cpp b/cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.cpp new file mode 100644 index 0000000000..213821ebe2 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.cpp @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file EnqueueHeader.cpp + */ + +#include "EnqueueHeader.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +EnqueueHeader::EnqueueHeader() : + EventHeader() +{} + +EnqueueHeader::EnqueueHeader(const uint32_t magic, + const uint8_t version, + const uint64_t recordId, + const uint64_t xidSize, + const uint64_t dataSize, + const bool overwriteIndicator, + const bool transient, + const bool external) : + EventHeader(magic, version, recordId, xidSize, dataSize, overwriteIndicator) +{ + setTransientFlag(transient); + setExternalFlag(external); +} + +EnqueueHeader::EnqueueHeader(const EnqueueHeader& eh) : + EventHeader(eh) +{} + +EnqueueHeader::~EnqueueHeader() +{} + +bool +EnqueueHeader::getTransientFlag() const +{ + return m_flags & ENQ_HDR_TRANSIENT_MASK; +} + +void +EnqueueHeader::setTransientFlag(const bool transient) +{ + m_flags = transient ? + m_flags | ENQ_HDR_TRANSIENT_MASK : + m_flags & (~ENQ_HDR_TRANSIENT_MASK); +} + +bool +EnqueueHeader::getExternalFlag() const +{ + return m_flags & ENQ_HDR_EXTERNAL_MASK; +} + +void +EnqueueHeader::setExternalFlag(const bool external) +{ + m_flags = external ? + m_flags | ENQ_HDR_EXTERNAL_MASK : + m_flags & (~ENQ_HDR_EXTERNAL_MASK); +} + +//static +uint64_t +EnqueueHeader::getHeaderSize() +{ + return sizeof(EnqueueHeader); +} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.h b/cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.h new file mode 100644 index 0000000000..4d6dc9cfed --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/EnqueueHeader.h @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file EnqueueHeader.h + * + * List of journal record structs: + * struct DequeueHeader + * struct EnqueueHeader <-- This file + * struct EventHeader + * struct FileHeader + * struct RecordHeader + * struct RecordTail + * struct TransactionHeader + * + * Overview of journal record structs: + * + * <pre> + * +------------+ +--------------+ + * | RecordTail | | RecordHeader | + * +------------+ | (abstract) | + * +--------------+ + * ^ + * | + * +----------------+---------+-------+-------------------+ + * | | | | + * +------------+ +-------------+ +---------------+ +-------------------+ + * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader | + * +------------+ +-------------+ +---------------+ +-------------------+ + * ^ + * | + * +---------------+ + * | EnqueueHeader | + * +---------------+ + * </pre> + */ + +#ifndef qpid_asyncStore_jrnl2_EnqueueHeader_h_ +#define qpid_asyncStore_jrnl2_EnqueueHeader_h_ + +#include "EventHeader.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +#pragma pack(1) + +/** + * \brief Struct for enqueue record header. This record stores message data to the + * journal. + * + * Enqueue records record the content of messages for possible later recovery, and + * are so-called because they correspond with the event of enqueuing the record on + * a queue. + * + * In addition to the fields inherited from RecordHeader, this struct includes both the + * transaction id (xid) and data blob sizes. + * + * This header precedes the enqueued message data in journal files, and unless there is + * no xid and no data (both with 0 length), is followed by a RecordTail. + * + * Record layout in binary format (32 bytes): + * <pre> + * 0x0 0x7 + * +-----+-----+-----+-----+-----+-----+-----+-----+ -+ + * 0x00 | m_magic | v | e | m_flags | | + * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader + * 0x08 | m_recordId | | + * +-----+-----+-----+-----+-----+-----+-----+-----+ -+ + * 0x10 | m_xidSize | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * 0x18 | m_dataSize | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * </pre> + * <table> + * <tr> + * <td>v</td> + * <td>file version [ <code>_version</code> ] (If the format or encoding of + * this file changes, then this number should be incremented)</td> + * </tr> + * <tr> + * <td>e</td> + * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for + * little endian, <b>true</b> (0x01) for big endian</td> + * </tr> + * </table> + * <pre> + * No content (ie m_xidSize==0 AND m_dataSize==0): + * +---------+ + * | enqueue | + * | header | + * +---------+ + * <-- 32 ---> + * + * With content: + * +---------+----------------+---------------------------+--------+ + * | enqueue | XID | Data | record | + * | header | | | tail | + * +---------+----------------+---------------------------+--------+ + * <-- 32 ---><-- m_xidSize --><------ m_dataSize -------><-- 16 --> + * </pre> + */ +class EnqueueHeader : public EventHeader +{ +public: + /** + * \brief Mask for the record header flags field m_flags which is used to indicate that a record is transient + * (if the flag is set) or durable (if the flag is not set). + */ + static const uint16_t ENQ_HDR_TRANSIENT_MASK = 0x10; + + /** + * \brief Mask for the record header flags field m_flags which is used to indicate that a record is using + * external storage for its data content (if the flag is set). If the flag is not set, then the data content + * is stored in the journal itself. + */ + static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20; + + /** + * \brief Default constructor, which sets all values to 0. + */ + EnqueueHeader(); + + /** + * \brief Convenience constructor which initializes values during construction. + * + * \param magic The magic for this record + * \param version Version of this record + * \param recordId Record identifier for this record + * \param xidSize Size of the transaction (or distributed transaction) ID for this record + * \param dataSize Size of the opaque data block for this record + * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this + * record + * \param transient Flag indicating that this record is transient (ie to be discarded on recovery) + * \param external Flag indicating that this record's data is stored externally to the journal, the data portion + * of the record identifies the storage location. + */ + EnqueueHeader(const uint32_t magic, + const uint8_t version, + const uint64_t recordId, + const uint64_t xidSize, + const uint64_t dataSize, + const bool overwriteIndicator, + const bool transient = false, + const bool external = false); + + /** + * \brief Copy constructor + * + * \param eh Instance to be copied + */ + EnqueueHeader(const EnqueueHeader& eh); + + /** + * \brief Virtual destructor + */ + virtual ~EnqueueHeader(); + + /** + * \brief Return the value of the Transient flag for this record. If set, this record is ignored during + * recovery. + * + * \returns true if the Transient flag for this record is set, false otherwise. + */ + bool getTransientFlag() const; + + /** + * \brief Set the value of the Transient flag for this record. + * + * \param transient The value to be set in the transient flag. + */ + void setTransientFlag(const bool transient = true); + + /** + * \brief Return the value of the External flag for this record. If set, this record data is not within the + * journal but external to it. The data part of this record contains the location of the stored data. + * + * \returns true if the Transient flag for this record is set, false otherwise. + */ + bool getExternalFlag() const; + + /** + * \brief Set the value of the External flag for this record. + * + * \param external The value to be set in the External flag. + */ + void setExternalFlag(const bool external = true); + + /** + * \brief Return the header size of this record in bytes. + * + * \returns Size of record header in bytes. + */ + static uint64_t getHeaderSize(); + +}; + +#pragma pack() + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_EnqueueHeader_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/EventHeader.cpp b/cpp/src/qpid/asyncStore/jrnl2/EventHeader.cpp new file mode 100644 index 0000000000..374d161af9 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/EventHeader.cpp @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file EventHeader.cpp + */ + +#include "EventHeader.h" + +#include "RecordTail.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +EventHeader::EventHeader() : + RecordHeader(), + m_xidSize(0), + m_dataSize(0) +{} + +EventHeader::EventHeader(const uint32_t magic, + const uint8_t version, + const uint64_t recordId, + const uint64_t xidSize, + const uint64_t dataSize, + const bool overwriteIndicator) : + RecordHeader(magic, version, recordId, overwriteIndicator), + m_xidSize(xidSize), + m_dataSize(dataSize) +{} + +EventHeader::EventHeader(const EventHeader& eh) : + RecordHeader(eh), + m_xidSize(eh.m_xidSize), + m_dataSize(eh.m_dataSize) +{} + +EventHeader::~EventHeader() +{} + +void +EventHeader::copy(const EventHeader& e) +{ + RecordHeader::copy(e); + m_xidSize = e.m_xidSize; + m_dataSize = e.m_dataSize; +} + +void +EventHeader::reset() +{ + RecordHeader::reset(); + m_xidSize = 0; + m_dataSize = 0; +} + +//static +uint64_t +EventHeader::getHeaderSize() +{ + return sizeof(EventHeader); +} + +uint64_t +EventHeader::getBodySize() const +{ + return m_xidSize + m_dataSize; +} + +uint64_t +EventHeader::getRecordSize() const +{ + return getHeaderSize() + (getBodySize() ? + getBodySize() + RecordTail::getSize() : + 0); +} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/EventHeader.h b/cpp/src/qpid/asyncStore/jrnl2/EventHeader.h new file mode 100644 index 0000000000..c4f48c1022 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/EventHeader.h @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file EventHeader.h + * + * List of journal record structs: + * struct DequeueHeader + * struct EnqueueHeader + * struct EventHeader <-- This file + * struct FileHeader + * struct RecordHeader + * struct RecordTail + * struct TransactionHeader + * + * Overview of journal record structs: + * + * <pre> + * +------------+ +--------------+ + * | RecordTail | | RecordHeader | + * +------------+ | (abstract) | + * +--------------+ + * ^ + * | + * +----------------+---------+-------+-------------------+ + * | | | | + * +------------+ +-------------+ +---------------+ +-------------------+ + * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader | + * +------------+ +-------------+ +---------------+ +-------------------+ + * ^ + * | + * +---------------+ + * | EnqueueHeader | + * +---------------+ + * </pre> + */ + +#ifndef qpid_asyncStore_jrnl2_EventHeader_h_ +#define qpid_asyncStore_jrnl2_EventHeader_h_ + +#include "RecordHeader.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +#pragma pack(1) + +/** + * \brief Struct for an event record header. Event records can be used to record + * system events in the store. + * + * The EventHeader record type may be used to store events into the journal which do + * not constitute data content but changes of state in the broker. These can be + * recovered and used to set appropriate state in the broker. + * + * This record is almost identical to EnqueueRecord, but without the flags. It + * precedes the xid and stored event data in journal files, and unless there is + * no xid and no data (both with 0 length), is followed by a RecordTail. + * + * \todo TODO: I am uncertain at this time whether it is necessary to set an XID on an event + * record, but in case, I have left this feature in. In any event, there is only a + * 1 byte size penalty in the header size for doing so. + * + * Record layout in binary format (32 bytes): + * <pre> + * 0x0 0x7 + * +-----+-----+-----+-----+-----+-----+-----+-----+ -+ + * 0x00 | m_magic | v | e | m_flags | | + * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader + * 0x08 | m_recordId | | + * +-----+-----+-----+-----+-----+-----+-----+-----+ -+ + * 0x10 | m_xidSize | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * 0x18 | m_dataSize | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * </pre> + * <table> + * <tr> + * <td>v</td> + * <td>file version [ <code>_version</code> ] (If the format or encoding of + * this file changes, then this number should be incremented)</td> + * </tr> + * <tr> + * <td>e</td> + * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for + * little endian, <b>true</b> (0x01) for big endian</td> + * </tr> + * </table> + * <pre> + * No content (ie m_xidSize==0 AND m_dataSize==0): + * +---------+ + * | event | + * | header | + * +---------+ + * <-- 32 ---> + * + * With content: + * +---------+----------------+---------------------------+--------+ + * | event | XID | Event data | record | + * | header | | | tail | + * +---------+----------------+---------------------------+--------+ + * <-- 32 ---><-- m_xidSize --><------ m_dataSize -------><-- 16 --> + * </pre> + */ +class EventHeader : public RecordHeader +{ +public: + uint64_t m_xidSize; ///< XID size + uint64_t m_dataSize; ///< Record data size + + /** + * \brief Default constructor, which sets all values to 0. + */ + EventHeader(); + + /** + * \brief Convenience constructor which initializes values during construction. + * + * \param magic The magic for this record + * \param version Version of this record + * \param recordId Record identifier for this record + * \param xidSize Size of the transaction (or distributed transaction) ID for this record + * \param dataSize Size of the opaque data block for this record + * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this + * record + */ + EventHeader(const uint32_t magic, + const uint8_t version, + const uint64_t recordId, + const uint64_t xidSize, + const uint64_t dataSize, + const bool overwriteIndicator); + + /** + * \brief Copy constructor + * + * \param eh Instance to be copied + */ + EventHeader(const EventHeader& eh); + + /** + * \brief Virtual destructor + */ + virtual ~EventHeader(); + + /** + * \brief Convenience copy method. + */ + virtual void copy(const EventHeader& eh); + + /** + * \brief Reset this record to default values (mostly 0) + */ + virtual void reset(); + + /** + * \brief Return the header size of this record in bytes. + * + * \returns Size of record header in bytes. + */ + static uint64_t getHeaderSize(); + + /** + * \brief Return the body (data) size of this record in bytes. + * + * \returns Size of record body in bytes. + */ + virtual uint64_t getBodySize() const; + + /** + * \brief Return total size of this record in bytes, being in the case of the enqueue record the size of the + * header, the size of the body (xid and data) and the size of the tail. + * + * \returns Total size of record in bytes. + */ + virtual uint64_t getRecordSize() const; + +}; + +#pragma pack() + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_EventHeader_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/FileHeader.cpp b/cpp/src/qpid/asyncStore/jrnl2/FileHeader.cpp new file mode 100644 index 0000000000..1b601c4cd7 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/FileHeader.cpp @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file FileHeader.cpp + */ + +#include "FileHeader.h" + +#include "JournalError.h" + +#include <cerrno> +#include <cstring> +#include <ctime> +#include <sstream> + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +FileHeader::FileHeader() : + RecordHeader(), + m_physicalFileId(0), + m_logicalFileId(0), + m_firstRecordOffset(0), + m_timestampSeconds(0), + m_timestampNanoSeconds(0), + m_reserved(0) +{} + +FileHeader::FileHeader(const uint32_t magic, + const uint8_t version, + const uint64_t recordId, + const bool overwriteIndicator, + const uint16_t physicalFileId, + const uint16_t logicalFileId, + const uint64_t firstRecordOffset, + const bool setTimestampFlag) : + RecordHeader(magic, version, recordId, overwriteIndicator), + m_physicalFileId(physicalFileId), + m_logicalFileId(logicalFileId), + m_firstRecordOffset(firstRecordOffset), + m_timestampSeconds(0), + m_timestampNanoSeconds(0), + m_reserved(0) +{ + if (setTimestampFlag) setTimestamp(); +} + +FileHeader::FileHeader(const FileHeader& fh) : + RecordHeader(fh), + m_physicalFileId(fh.m_physicalFileId), + m_logicalFileId(fh.m_logicalFileId), + m_firstRecordOffset(fh.m_firstRecordOffset), + m_timestampSeconds(fh.m_timestampSeconds), + m_timestampNanoSeconds(fh.m_timestampNanoSeconds), + m_reserved(fh.m_reserved) +{} + +FileHeader::~FileHeader() +{} + +void +FileHeader::copy(const FileHeader& fh) +{ + RecordHeader::copy(fh); + m_physicalFileId = fh.m_physicalFileId; + m_logicalFileId = fh.m_logicalFileId; + m_firstRecordOffset = fh.m_firstRecordOffset; + m_timestampSeconds = fh.m_timestampSeconds; + m_timestampNanoSeconds = fh.m_timestampNanoSeconds; + m_reserved = fh.m_reserved; +} + +void +FileHeader::reset() +{ + RecordHeader::reset(); + m_physicalFileId = 0; + m_logicalFileId = 0; + m_firstRecordOffset = 0; + m_timestampSeconds = 0; + m_timestampNanoSeconds = 0; + m_reserved = 0; +} + +//static +uint64_t +FileHeader::getHeaderSize() +{ + return sizeof(FileHeader); +} + +uint64_t +FileHeader::getBodySize() const +{ + return 0; +} + +uint64_t +FileHeader::getRecordSize() const +{ + return getHeaderSize(); +} + +void +FileHeader::setTimestamp() +{ + /// \todo TODO: Standardize on method for getting time that does not require a context switch. + timespec ts; + if (::clock_gettime(CLOCK_REALTIME, &ts)) { + std::ostringstream oss; + oss << FORMAT_SYSERR(errno); + throw JournalError(JournalError::JERR_RTCLOCK, oss.str(), "FileHeader", "setTimestamp"); + } + setTimestamp(ts); +} + +void +FileHeader::setTimestamp(const timespec& ts) +{ + m_timestampSeconds = static_cast<uint64_t>(ts.tv_sec); + m_timestampNanoSeconds = static_cast<uint32_t>(ts.tv_nsec); +} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/FileHeader.h b/cpp/src/qpid/asyncStore/jrnl2/FileHeader.h new file mode 100644 index 0000000000..2d7a17a90c --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/FileHeader.h @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file FileHeader.h + * + * List of journal record structs: + * struct DequeueHeader + * struct EnqueueHeader + * struct EventHeader + * struct FileHeader <-- This file + * struct RecordHeader + * struct RecordTail + * struct TransactionHeader + * + * Overview of journal record structs: + * + * <pre> + * +------------+ +--------------+ + * | RecordTail | | RecordHeader | + * +------------+ | (abstract) | + * +--------------+ + * ^ + * | + * +----------------+---------+-------+-------------------+ + * | | | | + * +------------+ +-------------+ +---------------+ +-------------------+ + * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader | + * +------------+ +-------------+ +---------------+ +-------------------+ + * ^ + * | + * +---------------+ + * | EnqueueHeader | + * +---------------+ + * </pre> + */ + +#ifndef qpid_asyncStore_jrnl2_FileHeader_h_ +#define qpid_asyncStore_jrnl2_FileHeader_h_ + +#include "RecordHeader.h" + +struct timespec; + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +#pragma pack(1) + +/** + * \brief Struct for the data written to the head of all journal files. + * + * In addition to the fields inherited from RecordHeader, this struct includes the + * unique record ID for this record and an offset of the first record in the file. + * There is also a field for a nanosecond-resolution time stamp which records the + * time that this file was first written in each cycle. + * + * This header precedes all data in journal files and occupies the first complete + * block in the file. The fields in this record are updated as necessary each time + * the file is written or overwritten. + * + * File layout in binary format (48 bytes): + * <pre> + * 0x0 0x7 + * +-----+-----+-----+-----+-----+-----+-----+-----+ -+ + * 0x00 | m_magic | v | e | m_flags | | + * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader + * 0x08 | m_recordId (used to show first rid in file) | | + * +-----+-----+-----+-----+-----+-----+-----+-----+ -+ + * 0x10 | m_physicalFileId | m_logicalFileId | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * 0x18 | m_firstRecordOffset | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * 0x20 | m_timestampSeconds | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * 0x28 |m_timestampNanoSeconds | m_reserved | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * </pre> + * <table> + * <tr> + * <td>v</td> + * <td>file version [ <code>_version</code> ] (If the format or encoding of + * this file changes, then this number should be incremented)</td> + * </tr> + * <tr> + * <td>e</td> + * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for + * little endian, <b>true</b> (0x01) for big endian</td> + * </tr> + * </table> + * General journal file structure: + * <pre> + * <----------------------------- Journal File -----------------------------------> + * +--------+ +-----------------+ +---------+ +---------+ +-------------------+ + * | file | | continuation of | | first | | second | ... | last record | + * | header | | record from | | record | | record | | (or part thereof) | + * | | | prev. file (opt)| | in file | | in file | | in file | + * +--------+ +-----------------+ +---------+ +---------+ +-------------------+ + * ^ + * | + * m_firstRecordOffset -----------+ + * </pre> + */ +class FileHeader : public RecordHeader +{ +public: + uint32_t m_physicalFileId; ///< Physical file ID (pfid) + uint32_t m_logicalFileId; ///< Logical file ID (lfid) + uint64_t m_firstRecordOffset;///< First record offset (fro) + uint64_t m_timestampSeconds; ///< Timestamp of journal initialization, seconds component + uint32_t m_timestampNanoSeconds;///< Timestamp of journal initialization, nanoseconds component + uint32_t m_reserved; ///< Little-endian filler for uint32_t + + /** + * \brief Default constructor, which sets all values to 0. + */ + FileHeader(); + + /** + * \brief Convenience constructor which initializes values during construction. + * \param magic Magic for this record + * \param version Version of this record + * \param recordId RecordId for this record + * \param overwriteIndicator Overwrite indicator for this record + * \param physicalFileId Physical file ID (file number on disk) + * \param logicalFileId Logical file ID (file number as seen by circular file buffer) + * \param firstRecordOffset First record offset in bytes from beginning of file + * \param setTimestampFlag If true, causes the timestamp to be initialized with the current system time + */ + FileHeader(const uint32_t magic, + const uint8_t version, + const uint64_t recordId, + const bool overwriteIndicator, + const uint16_t physicalFileId, + const uint16_t logicalFileId, + const uint64_t firstRecordOffset, + const bool setTimestampFlag = false); + + /** + * \brief Copy constructor. + * \param fh FileHeader instance to be copied + */ + FileHeader(const FileHeader& fh); + + /** + * \brief Destructor. + */ + virtual ~FileHeader(); + + /** + * \brief Convenience copy method. + * \param fh FileHeader instance to be copied + */ + void copy(const FileHeader& fh); + + /** + * \brief Resets all fields to default values (mostly 0). + */ + void reset(); + + /** + * \brief Return the header size of this record in bytes. + * \returns Size of record header in bytes. + */ + static uint64_t getHeaderSize(); + + /** + * \brief Return the body (data) size of this record in bytes, which in the case of a FileHeader is always 0. + * \returns Size of record body in bytes. By definition, a FileHeader has no body. + */ + uint64_t getBodySize() const; + + /** + * \brief Return total size of this record in bytes, being in the case of the + * FileHeader the size of the header itself only. + * \returns Total size of record in bytes. + */ + uint64_t getRecordSize() const; + + /** + * \brief Gets the current time from the system clock and sets the timestamp in the struct. + */ + void setTimestamp(); + + /** + * \brief Sets the timestamp in the struct to the provided value (in seconds and nanoseconds). + * \param ts Timestamp from which the file header time stamp is to be copied + */ + void setTimestamp(const timespec& ts); + +}; + +#pragma pack() + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_FileHeader_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.cpp b/cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.cpp new file mode 100644 index 0000000000..0ec576805d --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.cpp @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file JournalDirectory.cpp + */ +#include "JournalDirectory.h" + +#include "JournalError.h" + +#include <cstring> +#include <dirent.h> +#include <errno.h> +#include <sstream> +#include <unistd.h> + +#include <sys/stat.h> + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +JournalDirectory::JournalDirectory(const std::string& fqName) : + m_fqName(fqName), + m_verified(false) +{} + +const +std::string JournalDirectory::getFqName() const +{ + return m_fqName; +} + +void +JournalDirectory::setFqName(const std::string newFqName, + const bool createNew, + const bool destroyExisting) +{ + if (m_fqName.compare(newFqName) != 0) { + if (destroyExisting) { + destroy(); + } + m_fqName = newFqName; + if (createNew) { + create(); + } + } +} + +// static +bool +JournalDirectory::s_exists(const std::string& fqName, + const bool checkIsWritable) +{ + struct stat buff; + if (::lstat(fqName.c_str(), &buff)) { + if (errno == ENOENT) // No such dir or file + return false; + HANDLE_SYS_ERROR(fqName, JournalError::JERR_STAT, "s_exists"); + } + CHK_ERROR(!S_ISDIR(buff.st_mode), fqName, JournalError::JERR_NOTADIR, "s_exists"); + if (checkIsWritable) { + return (buff.st_mode & (S_IWUSR | S_IWGRP | S_IWOTH)); + } + return true; +} + +// static +void +JournalDirectory::s_create(const std::string& fqName) +{ + std::size_t fdp = fqName.find_last_of('/'); + if (fdp != std::string::npos) { + std::string parent_dir = fqName.substr(0, fdp); + if (!s_exists(parent_dir)) { + s_create(parent_dir); + } + } + if (::mkdir(fqName.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) { + if (errno != EEXIST) { // Dir exists, ignore + HANDLE_SYS_ERROR(fqName, JournalError::JERR_MKDIR, "s_create"); + } + } +} + +void +JournalDirectory::create() +{ + s_create(m_fqName); + m_verified = true; +} + +//static +void +JournalDirectory::s_clear(const std::string& fqName, + const bool recursiveDelete) +{ + s_destroy(fqName, recursiveDelete, true); +} + +void +JournalDirectory::clear(const bool recursiveDelete) +{ + s_clear(m_fqName, recursiveDelete); +} + +// static +void +JournalDirectory::s_destroy(const std::string& fqName, + const bool recursiveDelete, + const bool childrenOnly) +{ + if (s_exists(fqName)) { + DIR* dir = ::opendir(fqName.c_str()); + if (dir) { + struct stat buff; + struct dirent* entry; + while ((entry = ::readdir(dir)) != 0) { + // Ignore . and .. + if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0) { + std::string fullName(fqName + "/" + entry->d_name); + CHK_SYS_ERROR(::lstat(fullName.c_str(), &buff), fullName, JournalError::JERR_STAT, "s_destroy"); + if (S_ISREG(buff.st_mode) || S_ISLNK(buff.st_mode)) { // This is a file or symlink + CHK_SYS_ERROR(::unlink(fullName.c_str()), fullName, JournalError::JERR_UNLINK, "s_destroy"); + } else if (S_ISDIR(buff.st_mode)) { // This is a directory + if (recursiveDelete) { + s_destroy(fullName); + } else { + HANDLE_ERROR(fullName, JournalError::JERR_DIRNOTEMPTY, "s_destroy"); + } + } else { + HANDLE_ERROR(fullName, JournalError::JERR_BADFTYPE, "s_destroy"); + } + } + } + CHK_SYS_ERROR(::closedir(dir), fqName, JournalError::JERR_CLOSEDIR, "s_destroy"); + if (!childrenOnly) { + CHK_SYS_ERROR(::rmdir(fqName.c_str()), fqName, JournalError::JERR_RMDIR, "s_destroy"); + } + } else { + HANDLE_SYS_ERROR(fqName, JournalError::JERR_OPENDIR, "s_destroy"); + } + } +} + +void +JournalDirectory::destroy(const bool recursiveDelete, + const bool childrenOnly) +{ + if (m_verified) { + s_destroy(m_fqName, recursiveDelete, childrenOnly); + m_verified = false; + } +} + +bool +JournalDirectory::isVerified() const +{ + return m_verified; +} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.h b/cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.h new file mode 100644 index 0000000000..444094ccb4 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/JournalDirectory.h @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file JournalDirectory.h + */ + +#ifndef qpid_asyncStore_jrnl2_JournalDirectory_h_ +#define qpid_asyncStore_jrnl2_JournalDirectory_h_ + +#include <string> + +// --- Helper macros --- + +/** + * \brief Macro to handle the throwing of JournalExceptions. + * + * \param name Directory name + * \param err Static JournalError instance corresponding to the error to be thrown. + * \param fn C-string conaining throwing function name + */ +#define HANDLE_ERROR(name, err, fn) \ + std::ostringstream oss; \ + oss << "name=\"" << name << "\""; \ + throw qpid::asyncStore::jrnl2::JournalError(err, oss.str(), "JournalDirectory", fn) + +/** + * \brief Macro to both check a result \a ret for a non-zero value and then throw a JournalException if so. + * + * \param ret Int result value to be checked for non-zero value. + * \param name Directory name. + * \param err Static JournalError instance corresponding to the error to be thrown. + * \param fn C-string conaining throwing function name + */ +#define CHK_ERROR(ret, name, err, fn) \ + if (ret) { HANDLE_ERROR(name, err, fn); } + +/** + * \brief Macro to handle the throwing of JournalException in which errno contains the error code. + * + * \param name Directory name + * \param err Static JournalError instance corresponding to the error to be thrown. + * \param fn C-string conaining throwing function name + */ +#define HANDLE_SYS_ERROR(name, err, fn) \ + std::ostringstream oss; \ + oss << "name=\"" << name << "\": " << FORMAT_SYSERR(errno); \ + throw qpid::asyncStore::jrnl2::JournalError(err, oss.str(), "JournalDirectory", fn) + +/** + * \brief Macro to both check a result \a ret for a non-zero value, then throw a JournalException if so. The value + * of errno is read and incorporated into the error text. + * + * \param ret Int result value to be checked for non-zero value. + * \param name Directory name + * \param err Static JournalError instance corresponding to the error to be thrown. + * \param fn C-string conaining throwing function name + */ +#define CHK_SYS_ERROR(ret, name, err, fn) \ + if (ret) { HANDLE_SYS_ERROR(name, err, fn); } + + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +/** + * \brief Class which manages and controls the directory where a journal instance will be created and operated. + * + * This class can check for the existence of the target directory, and if it is not present, create it. It can + * also delete the directory (with all its contents). + */ +class JournalDirectory +{ +public: + /** + * \brief Constructor. + * + * \param fqName Name of directory to be checked for. May be an absolute or relative path. + * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails. + * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory. + * \throws JournalException with error JournalErrors::JERR_UNLINK if ::unlink() fails + * \throws JournalException with error JournalErrors::JERR_DIRNOTEMPTY if \a recursiveDelete is \b false and + * directory \a name contains one or more directories. + * \throws JournalException with error JournalErrors::JERR_BADFTYPE if \a name is not a regular file, symlink + * or directory. + * \throws JournalException with error JournalErrors::JERR_CLOSEDIR if ::closedir() fails. + * \throws JournalException with error JournalErrors::JERR_RMDIR if ::rmdir() fails. + * \throws JournalException with error JournalErrors::JERR_OPENDIR if ::opendir() fails. + */ + JournalDirectory(const std::string& fqName); + + /** + * \brief Returns the name of the journal directory. + * + * \returns Name of the joruanl directory. + */ + const std::string getFqName() const; + + /** + * \brief Set a (new) journal directory location. The previous location is assumed to have been set in the + * constructor or a previous call to getName(). + * + * \param newFqName New directory (fully qualified name) + * \param createNew If \b true, will create the new directory immediately and setting m_verified to \b true, + * otherwise the name will be set, but no action taken to create or verify the name. + * \param destroyExisting If \b true, will destroy the old journal directory, deleting all its contents. If + * \b false, the existing directory will be left in tact. + * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails. + * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory. + * \throws JournalException with error JournalErrors::JERR_UNLINK if ::unlink() fails + * \throws JournalException with error JournalErrors::JERR_BADFTYPE if \a name is not a regular file, symlink + * or directory. + * \throws JournalException with error JournalErrors::JERR_CLOSEDIR if ::closedir() fails. + * \throws JournalException with error JournalErrors::JERR_RMDIR if ::rmdir() fails. + * \throws JournalException with error JournalErrors::JERR_OPENDIR if ::opendir() fails. + */ + void setFqName(const std::string newFqName, + const bool createNew = true, + const bool destroyExisting = false); + + /** + * \brief Static helper class to check for the existence of the directory \a name. + * + * \param fqName Name of directory to be checked for. May be an absolute or relative path. + * \param checkIsWritable Also check if the directory is writable. If the directory exists but is not writable + * and this parameter is set \b true, then this test will fail. + * \returns \b true if the directory exists and if \b checkIsWritable is set to \b true, then also writable, + * \b false otherwise. + * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails. + * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory. + */ + static bool s_exists(const std::string& fqName, + const bool checkIsWritable = true); + + /** + * \brief Static helper function to create a directory \a name. If parameter \a name is a path which includes + * one or more non-existent directories, then these will be recursively created as needed. + * + * \param fqName Name of directory to be created. May be an absolute or relative path. + * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails. + * \throws JournalException with error JournalErrors::JERR_NOTADIR if part of \a name exists, but is not a + * directory. + * \throws JournalException with error JournalErrors::JERR_MKDIR if ::mkdir() fails. + */ + static void s_create(const std::string& fqName); + + /** + * \brief Create the directory \a name supplied to the constructor. + * + * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails. + * \throws JournalException with error JournalErrors::JERR_NOTADIR if part of \a name exists, but is not a + * directory. + * \throws JournalException with error JournalErrors::JERR_MKDIR if ::mkdir() fails. + */ + void create(); + + /** + * \brief Static helper function to delete only the contents of directory \a name. + * + * \param fqName Name of directory whose contents are to be deleted. May be an absolute or relative path. + * \param recursiveDelete If \b true, then all subdirectories (if they exist) will also be deleted. + * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails. + * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory. + * \throws JournalException with error JournalErrors::JERR_UNLINK if ::unlink() fails + * \throws JournalException with error JournalErrors::JERR_DIRNOTEMPTY if \a recursiveDelete is \b false and + * directory \a name contains one or more directories. + * \throws JournalException with error JournalErrors::JERR_BADFTYPE if \a name is not a regular file, symlink + * or directory. + * \throws JournalException with error JournalErrors::JERR_CLOSEDIR if ::closedir() fails. + * \throws JournalException with error JournalErrors::JERR_RMDIR if ::rmdir() fails. + * \throws JournalException with error JournalErrors::JERR_OPENDIR if ::opendir() fails. + */ + static void s_clear(const std::string& fqName, + const bool recursiveDelete = true); + + /** + * \brief Delete only the contents of directory \a name. + * + * \param recursiveDelete If \b true, then all subdirectories (if they exist) will also be deleted. + * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails. + * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory. + * \throws JournalException with error JournalErrors::JERR_UNLINK if ::unlink() fails + * \throws JournalException with error JournalErrors::JERR_DIRNOTEMPTY if \a recursiveDelete is \b false and + * directory \a name contains one or more directories. + * \throws JournalException with error JournalErrors::JERR_BADFTYPE if \a name is not a regular file, symlink + * or directory. + * \throws JournalException with error JournalErrors::JERR_CLOSEDIR if ::closedir() fails. + * \throws JournalException with error JournalErrors::JERR_RMDIR if ::rmdir() fails. + * \throws JournalException with error JournalErrors::JERR_OPENDIR if ::opendir() fails. + */ + void clear(const bool recursiveDelete = true); + + /** + * \brief Static helper function to delete the contents of directory \a name, and by default, also the + * directory itself. + * + * \param fqName Name of directory to be deleted. May be an absolute or relative path. + * \param recursiveDelete If \b true, then all subdirectories (if they exist) will also be deleted. + * \param childrenOnly If \b true, then only the contents of directory \a name will be deleted and directory + * \a name will remain. Otherwise \a name itself will also be deleted. + * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails. + * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory. + * \throws JournalException with error JournalErrors::JERR_UNLINK if ::unlink() fails + * \throws JournalException with error JournalErrors::JERR_DIRNOTEMPTY if \a recursiveDelete is \b false and + * directory \a name contains one or more directories. + * \throws JournalException with error JournalErrors::JERR_BADFTYPE if \a name is not a regular file, symlink + * or directory. + * \throws JournalException with error JournalErrors::JERR_CLOSEDIR if ::closedir() fails. + * \throws JournalException with error JournalErrors::JERR_RMDIR if ::rmdir() fails. + * \throws JournalException with error JournalErrors::JERR_OPENDIR if ::opendir() fails. + */ + static void s_destroy(const std::string& fqName, + const bool recursiveDelete = true, + const bool childrenOnly = false); + + /** + * \brief Delete the contents of directory \a name, and by default, also the directory itself. + * + * \param recursiveDelete If \b true, then all subdirectories (if they exist) will also be deleted. + * \param childrenOnly If \b true, then only the contents of directory \a name will be deleted and directory + * \a name will remain. Otherwise \a name itself will also be deleted. + * \throws JournalException with error JournalErrors::JERR_STAT if ::lstat() fails. + * \throws JournalException with error JournalErrors::JERR_NOTADIR if \a name exists, but is not a directory. + * \throws JournalException with error JournalErrors::JERR_UNLINK if ::unlink() fails + * \throws JournalException with error JournalErrors::JERR_DIRNOTEMPTY if \a recursiveDelete is \b false and + * directory \a name contains one or more directories. + * \throws JournalException with error JournalErrors::JERR_BADFTYPE if \a name is not a regular file, symlink + * or directory. + * \throws JournalException with error JournalErrors::JERR_CLOSEDIR if ::closedir() fails. + * \throws JournalException with error JournalErrors::JERR_RMDIR if ::rmdir() fails. + * \throws JournalException with error JournalErrors::JERR_OPENDIR if ::opendir() fails. + */ + void destroy(const bool recursiveDelete = true, + const bool childrenOnly = false); + + /** + * \brief Return the verified status of the store directory. + * + * \returns \b true if the directory exists and is available for use as a journal directory, \b false otherwise. + */ + bool isVerified() const; + +protected: + std::string m_fqName; ///< Name of directory (fully qualified name) + bool m_verified; ///< True when verified that it exists or is created + +}; + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_JournalDirectory_h_ + diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalError.cpp b/cpp/src/qpid/asyncStore/jrnl2/JournalError.cpp new file mode 100644 index 0000000000..3c64148b5b --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/JournalError.cpp @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file JournalError.cpp + */ + +#include "JournalError.h" + +#include <iomanip> // std::setfill(), std::setw() +#include <sstream> // std::ostringstream + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +JournalError::JournalError() throw () : + std::runtime_error(std::string()), + m_errorCode(0) +{ + formatWhatStr(); +} + +JournalError::JournalError(const uint32_t errorCode) throw () : + std::runtime_error(std::string()), + m_errorCode(errorCode) +{ + formatWhatStr(); +} + +JournalError::JournalError(const char* additionalInfo) throw () : + std::runtime_error(std::string()), + m_errorCode(0), + m_additionalInfo(additionalInfo) +{ + formatWhatStr(); +} + +JournalError::JournalError(const std::string& additionalInfo) throw () : + std::runtime_error(std::string()), + m_errorCode(0), + m_additionalInfo(additionalInfo) +{ + formatWhatStr(); +} + +JournalError::JournalError(const uint32_t errorCode, + const char* additionalInfo) throw () : + std::runtime_error(std::string()), + m_errorCode(errorCode), + m_additionalInfo(additionalInfo) +{ + formatWhatStr(); +} + +JournalError::JournalError(const uint32_t errorCode, + const std::string& additionalInfo) throw () : + std::runtime_error(std::string()), + m_errorCode(errorCode), + m_additionalInfo(additionalInfo) +{ + formatWhatStr(); +} + +JournalError::JournalError(const uint32_t errorCode, + const char* throwingClass, + const char* throwingFunction) throw () : + std::runtime_error(std::string()), + m_errorCode(errorCode), + m_throwingClass(throwingClass), + m_throwingFunction(throwingFunction) +{ + formatWhatStr(); +} + +JournalError::JournalError(const uint32_t errorCode, + const std::string& throwingClass, + const std::string& throwingFunction) throw () : + std::runtime_error(std::string()), + m_errorCode(errorCode), + m_throwingClass(throwingClass), + m_throwingFunction(throwingFunction) +{ + formatWhatStr(); +} + +JournalError::JournalError(const uint32_t errorCode, + const char* additionalInfo, + const char* throwingClass, + const char* throwingFunction) throw () : + std::runtime_error(std::string()), + m_errorCode(errorCode), + m_additionalInfo(additionalInfo), + m_throwingClass(throwingClass), + m_throwingFunction(throwingFunction) +{ + formatWhatStr(); +} + +JournalError::JournalError(const uint32_t errorCode, + const std::string& additionalInfo, + const std::string& throwingClass, + const std::string& throwingFunction) throw () : + std::runtime_error(std::string()), + m_errorCode(errorCode), + m_additionalInfo(additionalInfo), + m_throwingClass(throwingClass), + m_throwingFunction(throwingFunction) +{ + formatWhatStr(); +} + +JournalError::~JournalError() throw () +{} + +const char* +JournalError::what() const throw () +{ + return m_what.c_str(); +} + +uint32_t +JournalError::getErrorCode() const throw () +{ + return m_errorCode; +} + +const std::string +JournalError::getAdditionalInfo() const throw () +{ + return m_additionalInfo; +} + +const std::string +JournalError::getThrowingClass() const throw () +{ + return m_throwingClass; +} + +const std::string +JournalError::getThrowingFunction() const throw () +{ + return m_throwingFunction; +} + +void +JournalError::toStream(std::ostream& os) const +{ + os << what(); +} + +// protected +void +JournalError::formatWhatStr() throw () +{ + try { + const bool ai = !m_additionalInfo.empty(); + const bool tc = !m_throwingClass.empty(); + const bool tf = !m_throwingFunction.empty(); + std::ostringstream oss; + oss << className() << " 0x" << std::hex << std::setfill('0') << std::setw(4) << m_errorCode << " "; + if (tc) { + oss << m_throwingClass; + if (tf) { + oss << "::"; + } else { + oss << " "; + } + } + if (tf) { + oss << m_throwingFunction << "() "; + } + if (tc || tf) { + oss << "threw " << s_errorMessage(m_errorCode); + } + if (ai) { + oss << " (" << m_additionalInfo << ")"; + } + m_what.assign(oss.str()); + } catch (...) {} +} + +// protected +const char* +JournalError::className() +{ + return s_className; +} + +// static error code definitions +JournalError::errorMap_t JournalError::s_errorMap; +JournalError::errorMapCitr_t JournalError::s_errorMapIterator; + +// generic and system errors +const uint32_t JournalError::JERR_PTHREAD = 0x0001; +const uint32_t JournalError::JERR_RTCLOCK = 0x0002; + +// illegal states +const uint32_t JournalError::JERR_JRNLRUNSTATE = 0x0101; +const uint32_t JournalError::JERR_MSGOPSTATE = 0x0102; +const uint32_t JournalError::JERR_MSGWRCMPLSTATE = 0x0103; + +// file ops and file i/o +const uint32_t JournalError::JERR_STAT = 0x0200; +const uint32_t JournalError::JERR_OPENDIR = 0x0201; +const uint32_t JournalError::JERR_CLOSEDIR = 0x0202; +const uint32_t JournalError::JERR_MKDIR = 0x0203; +const uint32_t JournalError::JERR_RMDIR = 0x0204; +const uint32_t JournalError::JERR_UNLINK = 0x0205; +const uint32_t JournalError::JERR_NOTADIR = 0x0206; +const uint32_t JournalError::JERR_BADFTYPE = 0x0207; +const uint32_t JournalError::JERR_DIRNOTEMPTY = 0x0208; + +// static +const char* +JournalError::s_errorMessage(const uint32_t err_no) throw () +{ + s_errorMapIterator = s_errorMap.find(err_no); + if (s_errorMapIterator == s_errorMap.end()) + return "<Unknown error code>"; + return s_errorMapIterator->second; +} + +// private static +const char* JournalError::s_className = "qpid::asyncStore::jrnl2::Error"; +bool JournalError::s_initializedFlag = JournalError::s_initialize(); + +// private static +bool +JournalError::s_initialize() +{ + s_errorMap[JERR_PTHREAD] = "JERR_PTHREAD: pthread operation failure"; + s_errorMap[JERR_RTCLOCK] = "JERR_RTCLOCK: realtime clock operation failure"; + + s_errorMap[JERR_JRNLRUNSTATE] = "JERR_JRNLRUNSTATE: Illegal journal run state transition"; + s_errorMap[JERR_MSGOPSTATE] = "JERR_MSGOPSTATE: Illegal msg op state transition"; + s_errorMap[JERR_MSGWRCMPLSTATE] = "JERR_MSGWRCMPLSTATE: Illegal msg write completion state transition"; + + s_errorMap[JERR_STAT] = "JERR_STAT: Call to stat() (file status) failed"; + s_errorMap[JERR_OPENDIR] = "JERR_OPENDIR: Call to opendir() failed"; + s_errorMap[JERR_CLOSEDIR] = "JERR_CLOSEDIR: Call to closedir() failed"; + s_errorMap[JERR_MKDIR] = "JERR_MKDIR: Call to mkdir() failed"; + s_errorMap[JERR_RMDIR] = "JERR_RMDIR: Call to rmdir() failed"; + s_errorMap[JERR_UNLINK] = "JERR_UNLINK: Call to unlink() (file delete) failed"; + s_errorMap[JERR_NOTADIR] = "JERR_NOTADIR: Not a directory"; + s_errorMap[JERR_BADFTYPE] = "JERR_BADFTYPE: Bad file type"; + s_errorMap[JERR_DIRNOTEMPTY] = "JERR_DIRNOTEMPTY: Directory not empty"; + + return true; +} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalError.h b/cpp/src/qpid/asyncStore/jrnl2/JournalError.h new file mode 100644 index 0000000000..42f64ef794 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/JournalError.h @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file JournalError.h + */ + +#ifndef qpid_asyncStore_jrnl2_JournalError_hpp_ +#define qpid_asyncStore_jrnl2_JournalError_hpp_ + +#include "Streamable.h" + +#include <map> +#include <stdexcept> // std::runtime_error +#include <stdint.h> // uint32_t + +// Macro definitions + +/** + * \brief Macro to retrieve and format the C errno value as a string. + * + * \param errno Value of errno to be formatted. + */ +#define FORMAT_SYSERR(errno) " errno=" << errno << " (" << std::strerror(errno) << ")" + +/** + * \brief Macro to check for a clean pthread creation and throwing a JournalException with code JERR_PTHREAD if + * thread creation failed. + * + * \param err Value or errno. + * \param pfn Name of system call that failed. + * \param cls Name of class in which function failed. + * \param fn Name of class function that failed. + */ +#define PTHREAD_CHK(err, pfn, cls, fn) if(err != 0) { \ + std::ostringstream oss; \ + oss << pfn << " failed: " << FORMAT_SYSERR(err); \ + throw qpid::asyncStore::jrnl2::JournalError(qpid::asyncStore::jrnl2::JournalError::JERR_PTHREAD, oss.str(), cls, fn); \ + } + +/** + * \brief Macro for throwing state-machine errors related to invalid state transitions. + * + * \param jerrno Error number to be thrown. + * \param target_st State into which the state machine was attemting to move. + * \param curr_st Current state of the state machine (making transition illegal). + * \param cls Name of class in which failure occurred. + * \param fn Name of class function that failed. + */ +#define THROW_STATE_EXCEPTION(jerrno, target_st, curr_st, cls, fn) { \ + std::ostringstream oss; \ + oss << cls << "::" << fn << "() in state " << curr_st << " cannot be moved to state " << target_st << "."; \ + throw qpid::asyncStore::jrnl2::JournalError(jerrno, oss.str(), cls, fn); \ + } + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +/** + * \brief Universal error and exception class for the async store. The class uses a set of error codes to indicate + * the error or failure condition. + */ +class JournalError : public std::runtime_error, public Streamable +{ +public: + /** + * \brief Default constructor. + */ + JournalError() throw (); + + /** + * \brief Constructor which accepts only the error code. + */ + JournalError(const uint32_t errorCode) throw (); + + /** + * \brief Constructor which accepts only additional info as a c-string. + */ + JournalError(const char* additionalInfo) throw (); + + /** + * \brief Constructor which accepts only additional info as a std::string. + */ + JournalError(const std::string& additionalInfo) throw (); + + /** + * \brief Constructor which accepts both the error code and additional info as a c-string. + */ + JournalError(const uint32_t errorCode, + const char* additionalInfo) throw (); + + /** + * \brief Constructor which accepts both the error code and additional info as a std::string. + */ + JournalError(const uint32_t errorCode, + const std::string& additionalInfo) throw (); + + + /** + * \brief Constructor which accepts both the error code and the throwing class name as a c-string. + */ + JournalError(const uint32_t errorCode, + const char* throwingClass, + const char* throwingFunction) throw (); + + /** + * \brief Constructor which accepts both the error code and the throwing class name as a std::string. + */ + JournalError(const uint32_t errorCode, + const std::string& throwingClass, + const std::string& throwingFunction) throw (); + + + /** + * \brief Constructor which accepts the error code, the throwing class and function as c-strings. + */ + JournalError(const uint32_t errorCode, + const char* additionalInfo, + const char* throwingClass, + const char* throwingFunction) throw (); + + /** + * \brief Constructor which accepts the error code, the throwing class and function as std::strings. + */ + JournalError(const uint32_t errorCode, + const std::string& additionalInfo, + const std::string& throwingClass, + const std::string& throwingFunction) throw (); + + + /** + * \brief Destructor guaranteed not to throw. + */ + virtual ~JournalError() throw (); + + /** + * \brief Overloaded std::exception::what() call which returns a string representation of the error or fault. + * + * \returns C-string representation of the error or fault. + */ + const char* what() const throw (); // override std::exception::what() + + /** + * \brief Get the error or fault code. + * + * \returns The error or fault code. + */ + uint32_t getErrorCode() const throw (); + + /** + * \brief Get additional error or fault info from the \c m_additionalInfo field. + * + * \returns Additional error or fault info. + */ + const std::string getAdditionalInfo() const throw (); + + /** + * \brief Get name of throwing class from the \c m_throwingClass field. + * + * \returns Name of throwing class. + */ + const std::string getThrowingClass() const throw (); + + /** + * \brief Get name of throwing function from \c m_throwingFunction field. + * + * \returns Name of throwing function. + */ + const std::string getThrowingFunction() const throw (); + + // -- Implementation of Streamable methods --- + + virtual void toStream(std::ostream&) const; + + // --- Static error codes --- + + // generic and system errors + static const uint32_t JERR_PTHREAD; ///< pthread operation failure + static const uint32_t JERR_RTCLOCK; ///< realtime clock operation failure + + // illegal states + static const uint32_t JERR_JRNLRUNSTATE; ///< Illegal journal run state transition + static const uint32_t JERR_MSGOPSTATE; ///< Illegal msg op state transition + static const uint32_t JERR_MSGWRCMPLSTATE; ///< Illegal msg write completion state transition + + // file ops and file i/o + static const uint32_t JERR_STAT; ///< Call to stat() or lstat() failed + static const uint32_t JERR_OPENDIR; ///< Call to opendir() failed + static const uint32_t JERR_CLOSEDIR; ///< Call to closedir() failed + static const uint32_t JERR_MKDIR; ///< Call to rmdir() failed + static const uint32_t JERR_RMDIR; ///< Call to rmdir() failed + static const uint32_t JERR_UNLINK; ///< Call to unlink() failed + static const uint32_t JERR_NOTADIR; ///< Not a directory + static const uint32_t JERR_BADFTYPE; ///< Bad file type + static const uint32_t JERR_DIRNOTEMPTY; ///< Directory not empty + + /** + * \brief Static method which converts an error or exception code into a c-string representation. + * + * \param err_no Error code for which a string representation is desired. + * \returns C-string representation of the error code \c err_no. + */ + static const char* s_errorMessage(const uint32_t err_no) throw (); + +protected: + uint32_t m_errorCode; ///< Error or failure code, taken from JournalErrors. + std::string m_additionalInfo; ///< Additional information pertaining to the error or failure. + std::string m_throwingClass; ///< Name of the class throwing the error. + std::string m_throwingFunction; ///< Name of the function throwing the error. + std::string m_what; ///< Standard error of failure message, taken from JournalErrors. + + /** + * \brief Protected function to format the error message. + */ + void formatWhatStr() throw (); + + /** + * \brief Return this class name to print in error messages. Derived classes will have different definitions + * for s_className, and thus this function will return the correct name in any child class. + */ + virtual const char* className(); + + typedef std::map<uint32_t, const char*> errorMap_t; ///< Type for map of error messages + typedef errorMap_t::const_iterator errorMapCitr_t; ///< Const iterator for map of error messages + + static errorMap_t s_errorMap; ///< Map of error messages + static errorMapCitr_t s_errorMapIterator; ///< Const iterator + +private: + static const char* s_className; ///< Name of this class, used in formatting error messages. + static bool s_initializedFlag; ///< Dummy flag, used to initialize map. + + static bool s_initialize(); ///< Static fn for initializing static data + +}; + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_JournalError_hpp_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalParameters.cpp b/cpp/src/qpid/asyncStore/jrnl2/JournalParameters.cpp new file mode 100644 index 0000000000..63417582bc --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/JournalParameters.cpp @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file JournalParameters.cpp + */ + +#include "JournalParameters.h" + +#include <sstream> + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +// static declarations +std::string JournalParameters::s_defaultJrnlDir = "/tmp/store"; +std::string JournalParameters::s_defaultJrnlBaseFileName = "JournalData"; +uint16_t JournalParameters::s_defaultNumJrnlFiles = 8; +uint32_t JournalParameters::s_defaultJrnlFileSize_sblks = 3072; +uint16_t JournalParameters::s_defaultWriteBuffNumPgs = 32; +uint32_t JournalParameters::s_defaultWriteBuffPgSize_sblks = 128; + +JournalParameters::JournalParameters() : + m_jrnlDir(s_defaultJrnlDir), + m_jrnlBaseFileName(s_defaultJrnlBaseFileName), + m_numJrnlFiles(s_defaultNumJrnlFiles), + m_jrnlFileSize_sblks(s_defaultJrnlFileSize_sblks), + m_writeBuffNumPgs(s_defaultWriteBuffNumPgs), + m_writeBuffPgSize_sblks(s_defaultWriteBuffPgSize_sblks) +{} + +JournalParameters::JournalParameters(const std::string& jrnlDir, + const std::string& jrnlBaseFileName, + const uint16_t numJrnlFiles, + const uint32_t jrnlFileSize_sblks, + const uint16_t writeBuffNumPgs, + const uint32_t writeBuffPgSize_sblks) : + m_jrnlDir(jrnlDir), + m_jrnlBaseFileName(jrnlBaseFileName), + m_numJrnlFiles(numJrnlFiles), + m_jrnlFileSize_sblks(jrnlFileSize_sblks), + m_writeBuffNumPgs(writeBuffNumPgs), + m_writeBuffPgSize_sblks(writeBuffPgSize_sblks) +{} + +JournalParameters::JournalParameters(const JournalParameters& sp) : + m_jrnlDir(sp.m_jrnlDir), + m_jrnlBaseFileName(sp.m_jrnlBaseFileName), + m_numJrnlFiles(sp.m_numJrnlFiles), + m_jrnlFileSize_sblks(sp.m_jrnlFileSize_sblks), + m_writeBuffNumPgs(sp.m_writeBuffNumPgs), + m_writeBuffPgSize_sblks(sp.m_writeBuffPgSize_sblks) +{} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalParameters.h b/cpp/src/qpid/asyncStore/jrnl2/JournalParameters.h new file mode 100644 index 0000000000..056c627d1c --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/JournalParameters.h @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file JournalParameters.h + */ + +#ifndef qpid_asyncStore_jrnl2_JournalParameters_h_ +#define qpid_asyncStore_jrnl2_JournalParameters_h_ + +#include <string> +#include <stdint.h> // uint16_t, uint32_t + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +/** + * \brief Class which encapsulates journal settings and options for one AsyncJournal instance. + * + * This class is a convenience encapsulation of a multitude of per-journal settings. An instance of this class + * cannot be shared between journal instances as there must be either a different journal directory or a different + * base file name (or both). + */ +class JournalParameters +{ +public: + // static default store params + static std::string s_defaultJrnlDir; ///< Default journal directory + static std::string s_defaultJrnlBaseFileName; ///< Default base file name for all journal files + static uint16_t s_defaultNumJrnlFiles; ///< Default number of journal files + static uint32_t s_defaultJrnlFileSize_sblks; ///< Default journal file size (in sblks) + static uint16_t s_defaultWriteBuffNumPgs; ///< Default number of write buffer pages + static uint32_t s_defaultWriteBuffPgSize_sblks; ///< Default write buffer page size (in sblks) + + std::string m_jrnlDir; ///< Journal directory + std::string m_jrnlBaseFileName; ///< Base file name for all journal files + uint16_t m_numJrnlFiles; ///< Number of journal files + uint32_t m_jrnlFileSize_sblks; ///< Journal file size (in sblks) + uint16_t m_writeBuffNumPgs; ///< Number of write buffer pages + uint32_t m_writeBuffPgSize_sblks; ///< Write buffer page size (in dblks) + + /** + * \brief Default constructor. This will set all members to their default settings. + */ + JournalParameters(); + + /** + * \brief Explicit constructor, in which all settings must be supplied. + * + * \param jrnlDir Journal directory + * \param jrnlBaseFileName Base file name for all journal files + * \param numJrnlFiles Number of journal files + * \param jrnlFileSize_sblks Journal file size (in sblks) + * \param writeBuffNumPgs Number of write buffer pages + * \param writeBuffPgSize_sblks Write buffer page size (in dblks) + */ + JournalParameters(const std::string& jrnlDir, + const std::string& jrnlBaseFileName, + const uint16_t numJrnlFiles, + const uint32_t jrnlFileSize_sblks, + const uint16_t writeBuffNumPgs, + const uint32_t writeBuffPgSize_sblks); + + /** + * \brief Copy constructor. + * + * \param sp Ref to JournalParameters instance to be copied. + */ + JournalParameters(const JournalParameters& sp); + +}; + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_JournalParameters_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalRunState.cpp b/cpp/src/qpid/asyncStore/jrnl2/JournalRunState.cpp new file mode 100644 index 0000000000..067ed12adf --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/JournalRunState.cpp @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file JournalRunState.cpp + */ + +#include "JournalRunState.h" + +#include "JournalError.h" + +#include <sstream> + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +JournalRunState::JournalRunState() : + State<journalState_t>() +{} + +JournalRunState::JournalRunState(const JournalRunState& s) : + State<journalState_t>(s) +{} + +JournalRunState::JournalRunState(const journalState_t s) : + State<journalState_t>(s) +{} + +JournalRunState::~JournalRunState() +{} + +void +JournalRunState::initialize() +{ + set(JS_INITIALIZING); +} + +void +JournalRunState::recoverPhase1() +{ + set(JS_RECOVERING_PHASE_1); +} + +void +JournalRunState::recoverPhase2() +{ + set(JS_RECOVERING_PHASE_2); +} + +void +JournalRunState::run() +{ + set(JS_RUNNING); +} + +void +JournalRunState::stop() +{ + set(JS_STOPPING); +} + +void +JournalRunState::stopped() +{ + set(JS_STOPPED); +} + +const char* +JournalRunState::getAsStr() const +{ + return s_toStr(m_state); +} + +// static +const char* +JournalRunState::s_toStr(const journalState_t s) +{ + switch (s) { + case JS_NONE: + return "JS_NONE"; + case JS_RECOVERING_PHASE_1: + return "JS_RECOVERING_PHASE_1"; + case JS_RECOVERING_PHASE_2: + return "JS_RECOVERING_PHASE_2"; + case JS_INITIALIZING: + return "JS_INITIALIZING"; + case JS_RUNNING: + return "JS_RUNNING"; + case JS_STOPPING: + return "JS_STOPPING"; + case JS_STOPPED: + return "JS_STOPPED"; + default: + std::ostringstream oss; + oss << "<unknown state (" << "s" << ")>"; + return oss.str().c_str(); + } +} + +// protected +void +JournalRunState::set(const journalState_t s) +{ + // State transition logic: set stateError to true if an invalid transition is attempted + bool stateTransitionError = false; + switch(m_state) { + case JS_NONE: + if (s != JS_RECOVERING_PHASE_1 && s != JS_INITIALIZING) stateTransitionError = true; + break; + case JS_RECOVERING_PHASE_1: + if (s != JS_RECOVERING_PHASE_2) stateTransitionError = true; + break; + case JS_RECOVERING_PHASE_2: + case JS_INITIALIZING: + if (s != JS_RUNNING) stateTransitionError = true; + break; + case JS_RUNNING: + if (s != JS_STOPPING) stateTransitionError = true; + break; + case JS_STOPPING: + if (s != JS_STOPPED) stateTransitionError = true; + break; + case JS_STOPPED: // Cannot move out of this state except with reset() + stateTransitionError = true; + break; + } + if (stateTransitionError) { + THROW_STATE_EXCEPTION(JournalError::JERR_JRNLRUNSTATE, s_toStr(s), s_toStr(m_state), + "JournalState", "set"); + } + m_state = s; +} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/JournalRunState.h b/cpp/src/qpid/asyncStore/jrnl2/JournalRunState.h new file mode 100644 index 0000000000..85141d8e18 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/JournalRunState.h @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file JournalRunState.h + */ + +#ifndef qpid_asyncStore_jrnl2_JournalRunState_h_ +#define qpid_asyncStore_jrnl2_JournalRunState_h_ + +#include "State.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +/** + * \brief Enumeration of valid AsyncJournal states. + */ +typedef enum { + JS_NONE = 0, ///< Not initialized, not capable of running (This is the default/reset state). + JS_RECOVERING_PHASE_1, ///< Recovery phase 1 (analyzing and restoring AsyncJournal state from journal files). + JS_RECOVERING_PHASE_2, ///< Recovery phase 2 (recovering message content and restoring messages in the broker). + JS_INITIALIZING, ///< Initialization of a new journal, including preparing journal files. + JS_RUNNING, ///< Initialization or recovery complete, journal running and accepting content. + JS_STOPPING, ///< Journal stopping. No new requests for service accepted, waiting for internal state to become consistent. + JS_STOPPED ///< Journal stopped. No new requests for service accepted, store in a consistent state. +} journalState_t; + +/** + * \brief State manager for the state of an AsyncJournal instance. + * + * The AsyncJournal has the following states and transitions: + * \dot + * digraph journalState_t { + * node [fontname=Helvetica, fontsize=8]; + * JS_NONE [URL="\ref JS_NONE"] + * JS_RECOVERING_PHASE_1 [URL="\ref JS_RECOVERING_PHASE_1"] + * JS_RECOVERING_PHASE_2 [URL="\ref JS_RECOVERING_PHASE_2"] + * JS_INITIALIZING [URL="\ref JS_INITIALIZING"] + * JS_RUNNING [URL="\ref JS_RUNNING"] + * JS_STOPPING [URL="\ref JS_STOPPING"] + * JS_STOPPED [URL="\ref JS_STOPPED"] + * + * edge [fontname=Helvetica, fontsize=8] + * JS_NONE->JS_INITIALIZING [label="initialize()" URL="\ref qpid::jrnl2::JournalRunState::initialize()"] + * JS_INITIALIZING->JS_RUNNING [label="run()" URL="\ref qpid::jrnl2::JournalRunState::run()"] + * JS_NONE->JS_RECOVERING_PHASE_1 [label="recoverPhase1()" URL="\ref qpid::jrnl2::JournalRunState::recoverPhase1()"] + * JS_RECOVERING_PHASE_1->JS_RECOVERING_PHASE_2 [label="recoverPhase2()" URL="\ref qpid::jrnl2::JournalRunState::recoverPhase2()"] + * JS_RECOVERING_PHASE_2->JS_RUNNING [label="run()" URL="\ref qpid::jrnl2::JournalRunState::run()"] + * JS_RUNNING->JS_STOPPING [label="stop()" URL="\ref qpid::jrnl2::JournalRunState::stop()"] + * JS_STOPPING->JS_STOPPED [label="stopped()" URL="\ref qpid::jrnl2::JournalRunState::stopped()"] + * } + * \enddot + */ +class JournalRunState : public State<journalState_t> +{ +public: + /** + * \brief Default constructor, setting internal state to JS_NONE. + */ + JournalRunState(); + + /** + * \brief Constructor allowing the internal state to be preset to any valid value of journalState_t. + * + * \param s State value to which the internal state of this new instance should be initialized. + */ + JournalRunState(const journalState_t s); + + /** + * \brief Copy constructor. + * \param s Instance from which this new instance state should be copied. + */ + JournalRunState(const JournalRunState& s); + + /** + * \brief Virtual destructor + */ + virtual ~JournalRunState(); + + // State change functions + + /** + * \brief Changes the journal state from JS_NONE to JS_INITIALIZING. + * + * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change + * to JS_INITIALIZING according to the state machine semantics. + */ + void initialize(); + + /** + * \brief Changes the journal state from JS_NONE to JS_RECOVERING_PHASE_1. + * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change + * to JS_RECOVERING_PHASE_1 according to the state machine semantics. + */ + void recoverPhase1(); + + /** + * \brief Changes the journal state from JS_RECOVERING_PHASE_1 to JS_RECOVERING_PHASE_2. + * + * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change + * to JS_RECOVERING_PHASE_2 according to the state machine semantics. + */ + void recoverPhase2(); + + /** + * \brief Changes the journal state from JS_INITIALIZING or JS_RECOVERING_PHASE_2 to JS_RUNNING. + * + * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change + * to JS_RUNNING according to the state machine semantics. + */ + void run(); + + /** + * \brief Changes the journal state from JS_RUNNING to JS_STOPPING. + * + * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change + * to JS_STOPPING according to the state machine semantics. + */ + void stop(); + + /** + * \brief Changes the journal state from JS_STOPPING to JS_STOPPED. + * + * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change + * to JS_STOPPED according to the state machine semantics. + */ + void stopped(); + + /** + * \brief Get the current state in a string form. + * + * \returns String representation of internal state m_state as returned by static function s_toStr(). + */ + const char* getAsStr() const; + + /** + * \brief Static helper function to convert a numeric journalState_t type to a string representation. + */ + static const char* s_toStr(const journalState_t s); + +protected: + /** + * \brief Set (or change) the value of m_state. This function implements the state machine checks for + * legal state change transitions, and throw an exception if an illegal state transition is requested. + * + * \param s State to which this machine should be changed. + * \throws JournalException with JERR_ILLEGALJRNLSTATECHANGE if current state makes it illegal to change + * to the requested state \a s. + */ + void set(const journalState_t s); + +}; + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_JournalRunState_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/RecordHeader.cpp b/cpp/src/qpid/asyncStore/jrnl2/RecordHeader.cpp new file mode 100644 index 0000000000..3317eb8438 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/RecordHeader.cpp @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecordHeader.cpp + */ + +#include "RecordHeader.h" + +#include "Configuration.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +RecordHeader::RecordHeader() : + m_magic(0), + m_version(0), + m_bigEndianFlag(0), + m_flags(0), + m_recordId(0) +{} + +RecordHeader::RecordHeader(const uint32_t magic, + const uint8_t version, + const uint64_t recordId, + const bool overwriteIndicator) : + m_magic(magic), + m_version(version), + m_bigEndianFlag(Configuration::s_endianValue), + m_flags(overwriteIndicator ? HDR_OVERWRITE_INDICATOR_MASK : 0), + m_recordId(recordId) +{} + +RecordHeader::RecordHeader(const RecordHeader& rh) : + m_magic(rh.m_magic), + m_version(rh.m_version), + m_bigEndianFlag(rh.m_bigEndianFlag), + m_flags(rh.m_flags), + m_recordId(rh.m_recordId) +{} + +RecordHeader::~RecordHeader() +{} + +void +RecordHeader::copy(const RecordHeader& rh) +{ + m_magic = rh.m_magic; + m_version = rh.m_version; + m_bigEndianFlag = rh.m_bigEndianFlag; + m_flags = rh.m_flags; + m_recordId = rh.m_recordId; +} + +void +RecordHeader::reset() +{ + m_magic = 0; + m_version = 0; + m_bigEndianFlag = 0; + m_flags = 0; + m_recordId = 0; +} + +bool +RecordHeader::getOverwriteIndicator() const +{ + return m_flags & HDR_OVERWRITE_INDICATOR_MASK; +} + +void +RecordHeader::setOverwriteIndicator(const bool owi) +{ + m_flags = owi ? + m_flags | HDR_OVERWRITE_INDICATOR_MASK : + m_flags & (~HDR_OVERWRITE_INDICATOR_MASK); +} + +//static +uint64_t +RecordHeader::getHeaderSize() +{ + return static_cast<uint64_t>(sizeof(RecordHeader)); +} + +uint32_t +RecordHeader::getCheckSum(uint32_t initialValue) const +{ + uint32_t cs = initialValue; + for (unsigned char* p = (unsigned char*)this; + p < (unsigned char*)this + getHeaderSize() + getBodySize(); + p++) { + cs ^= (uint32_t)(*p); + bool carry = cs & uint32_t(0x80000000); + cs <<= 1; + if (carry) cs++; + } + return cs; +} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/RecordHeader.h b/cpp/src/qpid/asyncStore/jrnl2/RecordHeader.h new file mode 100644 index 0000000000..9a350b5b84 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/RecordHeader.h @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecordHeader.h + * + * List of journal record structs: + * struct DequeueHeader + * struct EnqueueHeader + * struct EventHeader + * struct FileHeader + * struct RecordHeader <-- This file + * struct RecordTail + * struct TransactionHeader + * + * Overview of journal record structs: + * + * <pre> + * +------------+ +--------------+ + * | RecordTail | | RecordHeader | + * +------------+ | (abstract) | + * +--------------+ + * ^ + * | + * +----------------+---------+-------+-------------------+ + * | | | | + * +------------+ +-------------+ +---------------+ +-------------------+ + * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader | + * +------------+ +-------------+ +---------------+ +-------------------+ + * ^ + * | + * +---------------+ + * | EnqueueHeader | + * +---------------+ + * </pre> + */ + +#ifndef qpid_asyncStore_jrnl2_RecordHeader_h_ +#define qpid_asyncStore_jrnl2_RecordHeader_h_ + +#include <stdint.h> // uint8_t, uint16_t, uint32_t, uint64_t + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +#pragma pack(1) + +/** + * \brief Struct for data common to the head of all journal files and records. + * + * This block of data includes identification for the file type, the encoding + * version, an endian indicator and a record ID. + * + * File layout in binary format (16 bytes): + * <pre> + * 0x0 0x7 + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * 0x00 | m_magic | v | e | m_flags | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * 0x08 | m_recordId | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * </pre> + * <table> + * <tr> + * <td>v</td> + * <td>file version [ <code>_version</code> ] (If the format or encoding of + * this file changes, then this number should be incremented)</td> + * </tr> + * <tr> + * <td>e</td> + * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for + * little endian, <b>true</b> (0x01) for big endian</td> + * </tr> + * </table> + */ +class RecordHeader +{ +public: + uint32_t m_magic; ///< File type identifier (magic number) + uint8_t m_version; ///< File encoding version + uint8_t m_bigEndianFlag; ///< Flag for determining endianness + uint16_t m_flags; ///< User and system flags + uint64_t m_recordId; ///< Record ID (rotating 64-bit counter) + + /** + * \brief Mask for the record header flags field m_flags which is used to indicate that this record is part + * of a cycle opposite to the previous cycle. + * + * In order to identify which files and records were written last, the journal uses an overwrite indicator + * (OWI) flag in the headers of all records. This flag changes between \b true and \b false on consecutive runs + * through a circular buffer composed of several files. Looking for the point in the journal where the value of + * the OWI changes from that in the file header of the first journal file marks the last write point in the + * journal. + * + * This mask is used to isolate the OWI in the m_flags field of the record and file headers. + */ + static const uint16_t HDR_OVERWRITE_INDICATOR_MASK = 0x1; + + /** + * \brief Default constructor, which sets all values to 0. + */ + RecordHeader(); + + /** + * \brief Convenience constructor which initializes values during construction. + * + * \param magic Magic for this record + * \param version Version of this record + * \param recordId Record identifier for this record + * \param overwriteIndicator Overwrite indicator for this record + */ + RecordHeader(const uint32_t magic, + const uint8_t version, + const uint64_t recordId, + const bool overwriteIndicator); + + /** + * \brief Copy constructor. + */ + RecordHeader(const RecordHeader& rh); + + /** + * \brief Virtual destructor + */ + virtual ~RecordHeader(); + + /** + * \brief Convenience copy method. + */ + void copy(const RecordHeader& rh); + + /** + * \brief Resets all fields to default values (mostly 0). + */ + virtual void reset(); + + /** + * \brief Return the value of the Overwrite Indicator for this record. + * + * \returns true if the Overwrite Indicator flag is set, false otherwise. + */ + bool getOverwriteIndicator() const; + + /** + * \brief Set the value of the Overwrite Indicator for this record + */ + void setOverwriteIndicator(const bool owi); + + /** + * \brief Return the header size of this record in bytes. Must be implemented by + * subclasses. + * + * \returns Size of record header in bytes. + */ + static uint64_t getHeaderSize(); + + /** + * \brief Return the body (data) size of this record in bytes. Must be implemented + * by subclasses. + * + * \returns Size of record body in bytes. + */ + virtual uint64_t getBodySize() const = 0; + + /** + * \brief Return total size of this record in bytes, being the sum of the header, + * xid (if present), data (if present) and tail (if present). Must be implemented + * by subclasses. + * + * \returns The size of the entire record, including header, body (xid and data, + * if present) and record tail (if persent) in bytes. + */ + virtual uint64_t getRecordSize() const = 0; + + /// \todo TODO - Is this the right place for encode/decode fns? + ///** + // * \brief Encode (write) this record instance into the buffer pointed to by the buffer + // * pointer. Must be implemented by subclasses. + // */ + //virtual uint64_t encode(char* bufferPtr, + // const uint64_t bufferSize, + // const uint64_t encodeOffset = 0) = 0; + + /** + * \brief Return a uint32_t checksum for the header and body content of this record. + * + * \param initialValue The initial (or seed) value of the checksum. + * + * \returns Checksum for header and body of record. Tail (if any) is excluded. + */ + uint32_t getCheckSum(uint32_t initialValue = 0) const; + +}; + +#pragma pack() + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_RecordHeader_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/RecordIdCounter.h b/cpp/src/qpid/asyncStore/jrnl2/RecordIdCounter.h new file mode 100644 index 0000000000..5fe2c01429 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/RecordIdCounter.h @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecordIdCounter.h + */ + +#ifndef qpid_asyncStore_jrnl2_RecordCounter_h_ +#define qpid_asyncStore_jrnl2_RecordCounter_h_ + +#include "AtomicCounter.h" + +#include <stdint.h> // uint64_t + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +typedef uint64_t recordId_t; ///< Integral type used to represent the Record Id (RID). +typedef AtomicCounter<recordId_t> RecordIdCounter_t; ///< Counter to increment record ids + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_RecordCounter_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/RecordTail.cpp b/cpp/src/qpid/asyncStore/jrnl2/RecordTail.cpp new file mode 100644 index 0000000000..fe33d47b13 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/RecordTail.cpp @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecordTail.cpp + */ + + +#include "RecordTail.h" + +#include "RecordHeader.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +RecordTail::RecordTail() : + m_xMagic(0xffffffff), + m_checkSum(0), + m_recordId(0) +{} + +RecordTail::RecordTail(const uint32_t xMagic, + const uint32_t checkSum, + const uint64_t recordId) : + m_xMagic(xMagic), + m_checkSum(checkSum), + m_recordId(recordId) +{} + +RecordTail::RecordTail(const RecordHeader& rh) : + m_xMagic(~rh.m_magic), + m_checkSum(rh.getCheckSum()), + m_recordId(rh.m_recordId) +{} + +RecordTail::RecordTail(const RecordTail& rt) : + m_xMagic(rt.m_xMagic), + m_checkSum(rt.m_checkSum), + m_recordId(rt.m_recordId) +{} + +void +RecordTail::copy(const RecordTail& rt) +{ + m_xMagic = rt.m_xMagic; + m_checkSum = rt.m_checkSum; + m_recordId = rt.m_recordId; +} + +void +RecordTail::reset() +{ + m_xMagic = 0xffffffff; + m_checkSum = 0; + m_recordId = 0; +} + +//static +uint64_t +RecordTail::getSize() +{ + return sizeof(RecordTail); +} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/RecordTail.h b/cpp/src/qpid/asyncStore/jrnl2/RecordTail.h new file mode 100644 index 0000000000..a5b8afc576 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/RecordTail.h @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecordTail.h + * + * List of journal record structs: + * struct DequeueHeader + * struct EnqueueHeader + * struct EventHeader + * struct FileHeader + * struct RecordHeader + * struct RecordTail <-- This file + * struct TransactionHeader + * + * Overview of journal record structs: + * + * <pre> + * +------------+ +--------------+ + * | RecordTail | | RecordHeader | + * +------------+ | (abstract) | + * +--------------+ + * ^ + * | + * +----------------+---------+-------+-------------------+ + * | | | | + * +------------+ +-------------+ +---------------+ +-------------------+ + * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader | + * +------------+ +-------------+ +---------------+ +-------------------+ + * ^ + * | + * +---------------+ + * | EnqueueHeader | + * +---------------+ + * </pre> + */ + +#ifndef qpid_asyncStore_jrnl2_RecordTail_h_ +#define qpid_asyncStore_jrnl2_RecordTail_h_ + +#include <stdint.h> // uint32_t, uint64_t + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +class RecordHeader; + +#pragma pack(1) + +/** + * \brief Struct for the tail of all records which contain data and which must follow + * the data portion of that record. + * + * The magic number used here is the binary inverse (1's complement) of the magic + * used in the record header; this minimizes possible confusion with other headers + * that may be present during recovery. The tail is used with all records that have + * either XIDs or data - i.e. any size-variable content. Currently the only records + * that do NOT use the tail are non-transactional dequeues and filler records. + * + * Record layout in binary format (16 bytes): + * <pre> + * 0x0 0x7 + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * 0x00 | m_xMagic | m_checkSum | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * 0x08 | m_recordId | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * </pre> + */ +class RecordTail +{ +public: + uint32_t m_xMagic; ///< Binary inverse (1's complement) of hdr magic number + uint32_t m_checkSum; ///< Checksum for header and body of record + uint64_t m_recordId; ///< Record identifier matching that of the header for this record + + /** + * \brief Default constructor, which sets most values to 0. + */ + RecordTail(); + + /** + * \brief Convenience constructor which initializes values during construction. + * + * \param xMagic The inverse of the record header magic (ie ~rh._magic). + * \param checkSum The checksum for this record header and body. + * \param recordId The record identifier matching the record header. + */ + RecordTail(const uint32_t xMagic, + const uint32_t checkSum, + const uint64_t recordId); + + /** + * \brief Convenience constructor which initializes values during construction from existing RecordHeader + * instance. + * + * \param rh Header instance for which the RecordTail is to be created. + */ + RecordTail(const RecordHeader& rh); + + /** + * \brief Copy constructor. + * + * \param rt Instance to be copied. + */ + RecordTail(const RecordTail& rt); + + /** + * \brief Convenience copy method. + * + * \param rt Instance to be copied. + */ + void copy(const RecordTail& rt); + + /** + * \brief Resets all fields to default values (mostly 0). + */ + void reset(); + + /** + * \brief Returns the size of the header in bytes. + */ + static uint64_t getSize(); + +}; + +#pragma pack() + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_RecordTail_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/ScopedLock.cpp b/cpp/src/qpid/asyncStore/jrnl2/ScopedLock.cpp new file mode 100644 index 0000000000..4acba534ef --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/ScopedLock.cpp @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ScopedLock.cpp + */ + +#include "ScopedLock.h" + +#include "JournalError.h" + +#include <cerrno> // EBUSY +#include <cstring> // std::strerror +#include <sstream> // std::ostringstream + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +// --- ScopedMutex --- + +ScopedMutex::ScopedMutex() +{ + PTHREAD_CHK(::pthread_mutex_init(&m_mutex, 0), "::pthread_mutex_init", "ScopedMutex", "ScopedMutex"); +} + +ScopedMutex::~ScopedMutex() +{ + PTHREAD_CHK(::pthread_mutex_destroy(&m_mutex), "::pthread_mutex_destroy", "ScopedMutex", "~ScopedMutex"); +} + +::pthread_mutex_t* +ScopedMutex::get() const +{ + return &m_mutex; +} + + +// --- ScopedMutexContainer --- + +ScopedMutexContainer::ScopedMutexContainer(const ScopedMutex& sm) : + m_scopedMutexRef(sm) +{} + +::pthread_mutex_t* ScopedMutexContainer::get() const +{ + return m_scopedMutexRef.get(); +} + + +// --- ScopedLock --- + +ScopedLock::ScopedLock(const ScopedMutex& sm) : + ScopedMutexContainer(sm) +{ + PTHREAD_CHK(::pthread_mutex_lock(m_scopedMutexRef.get()), "::pthread_mutex_lock", "ScopedLock", "ScopedLock"); +} + +ScopedLock::~ScopedLock() +{ + PTHREAD_CHK(::pthread_mutex_unlock(m_scopedMutexRef.get()), "::pthread_mutex_unlock", "ScopedLock", "~ScopedLock"); +} + + +// --- ScopedTryLock --- + +ScopedTryLock::ScopedTryLock(const ScopedMutex& sm) : + ScopedMutexContainer(sm), + m_lockedFlag(false) +{ + int ret = ::pthread_mutex_trylock(m_scopedMutexRef.get()); + m_lockedFlag = (ret == 0); // check if lock obtained + if (!m_lockedFlag && ret != EBUSY) { + PTHREAD_CHK(ret, "::pthread_mutex_trylock", "ScopedTryLock", "ScopedTryLock"); + } +} + +ScopedTryLock::~ScopedTryLock() +{ + if (m_lockedFlag) + PTHREAD_CHK(::pthread_mutex_unlock(m_scopedMutexRef.get()), "::pthread_mutex_unlock", "ScopedTryLock", + "~ScopedTryLock"); +} + +bool +ScopedTryLock::isLocked() const +{ + return m_lockedFlag; +} + + +// --- ScopedConditionVariable --- +/* + +ScopedConditionVariable::ScopedConditionVariable() +{ + PTHREAD_CHK(::pthread_cond_init(&m_cv, 0), "pthread_cond_init", "ScopedConditionVariable", + "ScopedConditionVariable"); +} + +ScopedConditionVariable::~ScopedConditionVariable() +{ + PTHREAD_CHK(::pthread_cond_destroy(&m_cv), "pthread_cond_destroy", "ScopedConditionVariable", + "~ScopedConditionVariable"); +} + +void +ScopedConditionVariable::wait(ScopedLock& sl) +{ + PTHREAD_CHK(::pthread_cond_wait(&m_cv, sl.get()), "pthread_cond_wait", "ScopedConditionVariable", "wait"); +} + +void +ScopedConditionVariable::notify_one() +{ + PTHREAD_CHK(::pthread_cond_signal(&m_cv), "pthread_cond_signal", "ScopedConditionVariable", "notify_one"); +} + +void +ScopedConditionVariable::notify_all() +{ + PTHREAD_CHK(::pthread_cond_broadcast(&m_cv), "pthread_cond_broadcast", "ScopedConditionVariable", + "notify_all"); +} +*/ + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/ScopedLock.h b/cpp/src/qpid/asyncStore/jrnl2/ScopedLock.h new file mode 100644 index 0000000000..a420850c0b --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/ScopedLock.h @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ScopedLock.h + */ + +#ifndef qpid_asyncStore_jrnl2_ScopedLock_h_ +#define qpid_asyncStore_jrnl2_ScopedLock_h_ + +#include <pthread.h> + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +class ScopedMutex +{ +public: + ScopedMutex(); + virtual ~ScopedMutex(); + ::pthread_mutex_t* get() const; +protected: + mutable ::pthread_mutex_t m_mutex; +}; + + + +class ScopedMutexContainer +{ +public: + ScopedMutexContainer(const ScopedMutex& sm); + ::pthread_mutex_t* get() const; +protected: + const ScopedMutex& m_scopedMutexRef; ///< Reference to ScopedMutex instance being locked. +}; + + + +class ScopedLock : public ScopedMutexContainer +{ +public: + ScopedLock(const ScopedMutex& sm); + virtual ~ScopedLock(); +}; + + + +class ScopedTryLock : public ScopedMutexContainer +{ +public: + ScopedTryLock(const ScopedMutex& sm); + virtual ~ScopedTryLock(); + bool isLocked() const; +protected: + bool m_lockedFlag; +}; + + + +/* +class ScopedConditionVariable +{ +public: + ScopedConditionVariable(); + ~ScopedConditionVariable(); + void wait(ScopedLock& sl); + void notify_one(); + void notify_all(); +protected: + mutable ::pthread_cond_t m_cv; +}; +*/ + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_ScopedLock_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/State.h b/cpp/src/qpid/asyncStore/jrnl2/State.h new file mode 100644 index 0000000000..3a4354760a --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/State.h @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file State.h + */ + +#ifndef qpid_asyncStore_jrnl2_State_h_ +#define qpid_asyncStore_jrnl2_State_h_ + +#include "Streamable.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +/** + * \brief Utility class to manage a simple state machine defined by an enumeration \a E. + * + * It is necessary that enumeration E has its default / initial value coded as value 0. This enumeration would + * be typically coded as follows: + * \code + * typedef enum {STATE1=0, STATE2, STATE3} myStates_t; + * class myStateMachine : State<myStates_t> { + * protected: + * void set(myStates_t s) { ... } // Implement state machine logic and transition checking here + * public: + * // State transition verbs + * void action1() { set(STATE2); } + * void action2() { set(STATE3); } + * }; + * \endcode + * + * For child classes, the state machine transition logic is implemented in set(const E), and the state change + * verbs are implemented by calling set() with values from E. A call to reset() will return the state to the + * state numerically equal to 0, irrespective of the current state or state transition logic. + * + * Function getAsStr() returns the current state as a string, and is usually implemented by calling a local static + * function which translates the values of E into c-string literals. + */ +template <class E> class State : public Streamable { +public: + /** + * \brief Default constructor, which sets the internal state m_state to the state in E which is numerically + * equal to 0. + */ + State<E>() : + Streamable(), + m_state(E(0)) + {} + + /** + * \brief Constructor which sets the internal state m_state to the value \a s. + */ + State<E>(const E s) : + Streamable(), + m_state(s) + {} + + /** + * \brief Copy constructor, which sets the internal state m_state to the same value as that of the copied + * instance \a s. + */ + State<E>(const State<E>& s) : + Streamable(), + m_state(s.m_state) + {} + + /** + * \brief Virtual destructor + */ + virtual ~State() + {} + + /** + * \brief Get the internal state m_state as an enumeration of type E. + * + * \returns The internal state m_state. + */ + virtual E get() const + { + return m_state; + } + + /** + * \brief Get the internal state m_state as a c-string. + * + * \returns Internal state m_state as a c-string. + */ + virtual const char* getAsStr() const = 0; + + /** + * \brief Reset the internal state m_state to the value of \a s or, if not supplied, to the state in E which is + * numerically equal to 0. + * + * \param s The state to which this instance should be reset. The default value is the member of E with a + * numerical value of 0. + * + * \note This call ignores the state transition logic in set() and forces the state to 0. This is not intended + * as a normal part of the state machine operation, but rather for reusing a state machine instance or in + * conditions where you need to force a change over the state machine logic. For cases where the normal state + * machine logic returns the state to its initial point, a verb should be created which uses set() and is + * subjected to the transition logic and checking of set(). + */ + virtual void reset(const E s = E(0)) + { + m_state = s; + } + + /** + * \brief Stream the string representation of the internal state m_state. + * + * \param os Stream to which the string representation of the internal state m_state will be sent. + */ + virtual void toStream(std::ostream& os = std::cout) const + { + os << getAsStr(); + } + +protected: + E m_state; ///< Local state of this state machine instance. + + /** + * \brief Set (or change) the value of m_state. This function must implement the state machine checks for + * legal state change transitions, and throw an exception if an illegal state transition is requested. + * + * \param s State to which this machine should be changed. + */ + virtual void set(const E s) = 0; + + /** + * \brief Compare the state \a s with the current state. + * + * \returns \b true if \a s is the same as internal state m_state, \b false otherwise. + */ + virtual bool is(const E s) const + { + return m_state == s; + } + +}; + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_State_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/Streamable.cpp b/cpp/src/qpid/asyncStore/jrnl2/Streamable.cpp new file mode 100644 index 0000000000..0ee86223b1 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/Streamable.cpp @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Streamable.cpp + */ + +#include "Streamable.h" + +#include <sstream> + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +Streamable::~Streamable() +{} + +std::string +Streamable::toString() const +{ + std::ostringstream oss; + toStream(oss); + return oss.str(); +} + +std::ostream& +operator<<(std::ostream& os, const Streamable& s) +{ + s.toStream(os); + return os; +} + +std::ostream& +operator<<(std::ostream& os, const Streamable* sPtr) +{ + sPtr->toStream(os); + return os; +} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/Streamable.h b/cpp/src/qpid/asyncStore/jrnl2/Streamable.h new file mode 100644 index 0000000000..45a15efda5 --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/Streamable.h @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Streamable.h + */ + +#ifndef qpid_asyncStore_jrnl2_Streamable_h_ +#define qpid_asyncStore_jrnl2_Streamable_h_ + +#include <iostream> +#include <string> + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +/** + * \brief Abstract class which provides the mechanisms to stream + * + * An abstract class which provides stream functions. The toStream() function must be implemented by subclasses, + * and is used by the remaining functions. For convenience, toString() returns a std::string object. + */ +class Streamable +{ +public: + /** + * \brief Virtual destructor + */ + virtual ~Streamable(); + + /** + * \brief Stream some representation of the object to an output stream. + * + * \param os Output stream to which the class data is to be streamed. + */ + virtual void toStream(std::ostream& os = std::cout) const = 0; + + /** + * \brief Creates a string representation of the test parameters. + * + * Convenience feature which creates and returns a std::string object containing the content of toStream(). + * + * \returns Content of toStream() + */ + std::string toString() const; + + /** + * \brief Stream the object to an output stream + * + * \param os Stream to which output is to be sent. + * \param s Object instance to be streamed. + * \returns Original stream passed into parameter \a os. + */ + friend std::ostream& operator<<(std::ostream& os, const Streamable& s); + + /** + * \brief Stream the object to an output stream through an object pointer + * + * \param os Stream to which output is to be sent. + * \param sPtr Pointer to object instance to be streamed. + * \returns Original stream passed into parameter \a os. + */ + friend std::ostream& operator<<(std::ostream& os, const Streamable* sPtr); + +}; + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_Streamable_h_ diff --git a/cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.cpp b/cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.cpp new file mode 100644 index 0000000000..9de9f2656c --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.cpp @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TransactionHeader.cpp + */ + +#include "RecordTail.h" +#include "TransactionHeader.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +TransactionHeader::TransactionHeader() : + RecordHeader(), + m_xidSize(0) +{} + +TransactionHeader::TransactionHeader(const uint32_t magic, + const uint8_t version, + const uint64_t recordId, + const uint64_t xidSize, + const bool overwriteIndicator) : + RecordHeader(magic, version, recordId, overwriteIndicator), + m_xidSize(xidSize) +{} + +TransactionHeader::TransactionHeader(const TransactionHeader& th) : + RecordHeader(th), + m_xidSize(th.m_xidSize) +{} + +TransactionHeader::~TransactionHeader() +{} + +void +TransactionHeader::copy(const TransactionHeader& th) +{ + RecordHeader::copy(th); + m_xidSize = th.m_xidSize; +} + +void +TransactionHeader::reset() +{ + RecordHeader::reset(); + m_xidSize = 0; +} + +//static +uint64_t +TransactionHeader::getHeaderSize() +{ + return sizeof(TransactionHeader); +} + +uint64_t +TransactionHeader::getBodySize() const +{ + return m_xidSize; +} + +uint64_t +TransactionHeader::getRecordSize() const +{ + // By definition, TransactionRecords must always have an xid, hence a record + // tail as well. No check on body size required in this case. + return getHeaderSize() + getBodySize() + RecordTail::getSize(); +} + +}}} // namespace qpid::asyncStore::jrnl2 diff --git a/cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.h b/cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.h new file mode 100644 index 0000000000..9bc0ee9e9b --- /dev/null +++ b/cpp/src/qpid/asyncStore/jrnl2/TransactionHeader.h @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TransactionHeader.h + * + * List of journal record structs: + * struct DequeueHeader + * struct EnqueueHeader + * struct EventHeader + * struct FileHeader + * struct RecordHeader + * struct RecordTail + * struct TransactionHeader <-- This file + * + * Overview of journal record structs: + * + * <pre> + * +------------+ +--------------+ + * | RecordTail | | RecordHeader | + * +------------+ | (abstract) | + * +--------------+ + * ^ + * | + * +----------------+---------+-------+-------------------+ + * | | | | + * +------------+ +-------------+ +---------------+ +-------------------+ + * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader | + * +------------+ +-------------+ +---------------+ +-------------------+ + * ^ + * | + * +---------------+ + * | EnqueueHeader | + * +---------------+ + * </pre> + */ + +#ifndef qpid_asyncStore_jrnl2_TransactionHeader_h_ +#define qpid_asyncStore_jrnl2_TransactionHeader_h_ + +#include "RecordHeader.h" + +namespace qpid { +namespace asyncStore { +namespace jrnl2 { + +#pragma pack(1) + +/** + * \brief Struct for transaction commit and abort records. + * + * Struct for DTX commit and abort records. Only the magic distinguishes between them. Since + * this record must be used in the context of a valid XID, the xidsize field must not be zero. + * Immediately following this record is the XID itself which is xidsize bytes long, followed by + * a rec_tail. + * + * Note that this record had its own rid distinct from the rids of the record(s) making up the + * transaction it is committing or aborting. + * + * Record layout in binary format (24 bytes): + * <pre> + * 0x0 0x7 + * +-----+-----+-----+-----+-----+-----+-----+-----+ -+ + * 0x00 | _magic | v | e | _flags | | + * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader + * 0x08 | _recordId | | + * +-----+-----+-----+-----+-----+-----+-----+-----+ -+ + * 0x10 | _xidSize | + * +-----+-----+-----+-----+-----+-----+-----+-----+ + * </pre> + * <table> + * <tr> + * <td>v</td> + * <td>file version [ <code>_version</code> ] (If the format or encoding of + * this file changes, then this number should be incremented)</td> + * </tr> + * <tr> + * <td>e</td> + * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for + * little endian, <b>true</b> (0x01) for big endian</td> + * </tr> + * </table> + */ +class TransactionHeader : public RecordHeader +{ +public: + uint64_t m_xidSize; ///< XID size + + /** + * \brief Default constructor, which sets all values to 0. + */ + TransactionHeader(); + + /** + * \brief Convenience constructor which initializes values during construction. + * + * \param magic The magic for this record + * \param version Version of this record + * \param recordId Record identifier for this record + * \param xidSize Size of the transaction (or distributed transaction) ID for this record + * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this + * record + */ + TransactionHeader(const uint32_t magic, + const uint8_t version, + const uint64_t recordId, + const uint64_t xidSize, + const bool overwriteIndicator); + + /** + * \brief Copy constructor + * + * \param th Instance to be copied + */ + TransactionHeader(const TransactionHeader& th); + + /** + * \brief Virtual destructor + */ + virtual ~TransactionHeader(); + + /** + * \brief Convenience copy method. + */ + inline void copy(const TransactionHeader& th); + + /** + * \brief Reset this record to default values (mostly 0) + */ + inline void reset(); + + /** + * \brief Return the header size of this record in bytes. + * + * \returns Size of record header in bytes. + */ + static uint64_t getHeaderSize(); + + /** + * \brief Return the body (data) size of this record in bytes. + * + * \returns Size of record body in bytes. + */ + uint64_t getBodySize() const; + + /** + * \brief Return total size of this record in bytes, being in the case of the dequeue record the size of the + * header, the size of the body (xid only) and the size of the tail. + */ + inline uint64_t getRecordSize() const; + +}; + +#pragma pack() + +}}} // namespace qpid::asyncStore::jrnl2 + +#endif // qpid_asyncStore_jrnl2_TransactionHeader_h_ |