From c1195b3f8677102d6d15a40044c66185f52bf327 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Mon, 20 Feb 2012 14:40:24 +0000 Subject: QPID-3858: Initial proposal for async store interface git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1291285 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/CMakeLists.txt | 1 + cpp/src/Makefile.am | 3 + cpp/src/qpid/broker/AsyncStore.cpp | 158 ++++++++++++++++++++++ cpp/src/qpid/broker/AsyncStore.h | 239 +++++++++++++++++++++++++++++++++ cpp/src/qpid/broker/AsyncStoreErrors.h | 59 ++++++++ cpp/src/qpid/broker/AsyncStoreToken.h | 101 ++++++++++++++ 6 files changed, 561 insertions(+) create mode 100644 cpp/src/qpid/broker/AsyncStore.cpp create mode 100644 cpp/src/qpid/broker/AsyncStore.h create mode 100644 cpp/src/qpid/broker/AsyncStoreErrors.h create mode 100644 cpp/src/qpid/broker/AsyncStoreToken.h diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index e8e543b672..72c6b5c4f1 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -1041,6 +1041,7 @@ set (qpidbroker_SOURCES ${qpidbroker_platform_SOURCES} qpid/amqp_0_10/Connection.h qpid/amqp_0_10/Connection.cpp + qpid/broker/AsyncStore.cpp qpid/broker/Broker.cpp qpid/broker/Credit.cpp qpid/broker/Exchange.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 4f733e985b..911621295b 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -526,6 +526,8 @@ libqpidbroker_la_SOURCES = \ qpid/amqp_0_10/Connection.cpp \ qpid/amqp_0_10/Connection.h \ qpid/broker/AclModule.h \ + qpid/broker/AsyncStore.cpp \ + qpid/broker/AsyncStore.h \ qpid/broker/Bridge.cpp \ qpid/broker/Bridge.h \ qpid/broker/Broker.cpp \ @@ -670,6 +672,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SignalHandler.cpp \ qpid/broker/SignalHandler.h \ qpid/broker/StatefulQueueObserver.h \ + qpid/broker/StoreToken.h \ qpid/broker/System.cpp \ qpid/broker/System.h \ qpid/broker/ThresholdAlerts.cpp \ diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/AsyncStore.cpp new file mode 100644 index 0000000000..aba75669fd --- /dev/null +++ b/cpp/src/qpid/broker/AsyncStore.cpp @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "AsyncStore.h" + +namespace qpid { +namespace broker { + + AsyncStorePlus::storeConfigDataReturn_t + AsyncStorePlus::storeConfigData(const boost::intrusive_ptr configData, + const successCbFn_t successCb, + const failCbFn_t failCb, + const void* cbCtxt) + { + boost::intrusive_ptr configTok(nextConfigToken()); + AsyncStoreOp op(AsyncStoreOp::OP_CONFIG_CREATE, configData, configTok, 0, 0, successCb, failCb, cbCtxt); + return AsyncStorePlus::storeConfigDataReturn_t(submit(op), configTok); + } + + int + AsyncStorePlus::destroyConfigData(const boost::intrusive_ptr configTok, + const successCbFn_t successCb, + const failCbFn_t failCb, + const void* cbCtxt) + { + AsyncStoreOp op(AsyncStoreOp::OP_CONFIG_DESTROY, 0, configTok, 0, 0, successCb, failCb, cbCtxt); + return submit(op); + } + + AsyncStorePlus::createQueueReturn_t + AsyncStorePlus::createQueue(const std::string& name, + const qpid::types::Variant::Map& options, + const boost::intrusive_ptr queueData, + const successCbFn_t successCb, + const failCbFn_t failCb, + const void* cbCtxt) + { + boost::intrusive_ptr queueTok(nextQueueToken(name, options)); + AsyncStoreOp op(AsyncStoreOp::OP_CONFIG_CREATE, queueData, queueTok, 0, 0, successCb, failCb, cbCtxt); + return AsyncStorePlus::createQueueReturn_t(submit(op), queueTok); + } + + int + AsyncStorePlus::destroyQueue(const boost::intrusive_ptr queueTok, + const successCbFn_t successCb, + const failCbFn_t failCb, + const void* cbCtxt) + { + AsyncStoreOp op(AsyncStoreOp::OP_CONFIG_DESTROY, 0, queueTok, 0, 0, successCb, failCb, cbCtxt); + return submit(op); + } + + AsyncStorePlus::txnReturn_t + AsyncStorePlus::beginTxn(const std::string xid, + const successCbFn_t successCb, + const failCbFn_t failCb, + const void* cbCtxt) + { + boost::intrusive_ptr txnTok(nextTxnToken(xid)); + AsyncStoreOp op(AsyncStoreOp::OP_TXN_BEGIN, 0, 0, 0, txnTok, successCb, failCb, cbCtxt); + return AsyncStorePlus::txnReturn_t(submit(op), txnTok); + } + + int + AsyncStorePlus::prepareTxn(const boost::intrusive_ptr txnTok, + const successCbFn_t successCb, + const failCbFn_t failCb, + const void* cbCtxt) + { + AsyncStoreOp op(AsyncStoreOp::OP_TXN_PREPARE, 0, 0, 0, txnTok, successCb, failCb, cbCtxt); + return submit(op); + } + + int + AsyncStorePlus::commitTxn(const boost::intrusive_ptr txnTok, + const successCbFn_t successCb, + const failCbFn_t failCb, + const void* cbCtxt) + { + AsyncStoreOp op(AsyncStoreOp::OP_TXN_COMMIT, 0, 0, 0, txnTok, successCb, failCb, cbCtxt); + return submit(op); + } + + int + AsyncStorePlus::abortTxn(const boost::intrusive_ptr txnTok, + const successCbFn_t successCb, + const failCbFn_t failCb, + const void* cbCtxt) + { + AsyncStoreOp op(AsyncStoreOp::OP_TXN_ABORT, 0, 0, 0, txnTok, successCb, failCb, cbCtxt); + return submit(op); + } + + AsyncStorePlus::enqReturn_t + AsyncStorePlus::enqueueMsg(const boost::intrusive_ptr msgData, + const boost::intrusive_ptr queueTok, + const boost::intrusive_ptr txnTok, + const successCbFn_t successCb, + const failCbFn_t failCb, + const void* cbCtxt) + { + boost::intrusive_ptr msgTok(nextMessageToken()); + AsyncStoreOp op(AsyncStoreOp::OP_ENQUEUE, msgData, msgTok, queueTok, txnTok, successCb, failCb, cbCtxt); + return AsyncStorePlus::enqReturn_t(submit(op), msgTok); + } + + int + AsyncStorePlus::dequeueMsg(const boost::intrusive_ptr msgTok, + const boost::intrusive_ptr queueTok, + const boost::intrusive_ptr txnTok, + const successCbFn_t successCb, + const failCbFn_t failCb, + const void* cbCtxt) + { + AsyncStoreOp op(AsyncStoreOp::OP_DEQUEUE, 0, msgTok, queueTok, txnTok, successCb, failCb, cbCtxt); + return submit(op); + } + + AsyncStorePlus::storeEventReturn_t + AsyncStorePlus::storeQueueEvent(const boost::intrusive_ptr eventData, + const boost::intrusive_ptr queueTok, + const boost::intrusive_ptr txnTok, + const successCbFn_t successCb, + const failCbFn_t failCb, + const void* cbCtxt) + { + boost::intrusive_ptr eventTok(nextEventToken()); + AsyncStoreOp op(AsyncStoreOp::OP_EVENT_STORE, eventData, eventTok, queueTok, txnTok, successCb, failCb, cbCtxt); + return AsyncStorePlus::storeEventReturn_t(submit(op), eventTok); + } + + int + AsyncStorePlus::flushQueue(const boost::intrusive_ptr queueTok, + const successCbFn_t successCb, + const failCbFn_t failCb, + const void* cbCtxt) + { + AsyncStoreOp op(AsyncStoreOp::OP_FLUSH, 0, 0, queueTok, 0, successCb, failCb, cbCtxt); + return submit(op); + } + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h new file mode 100644 index 0000000000..69434fc6a6 --- /dev/null +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef qpid_broker_AsyncStore_h_ +#define qpid_broker_AsyncStore_h_ + +#include "AsyncStoreErrors.h" +#include "AsyncStoreToken.h" +#include "qpid/broker/RecoveryManager.h" +#include "qpid/RefCounted.h" +#include "qpid/types/Variant.h" +#include + +#include + +namespace qpid { +namespace broker { + + // C functor-style definitions for async completion and failure callbacks + /** + * \brief Callback function used for successful completion of operation. + * + * \param ctxPtr Callback context pointer + */ + typedef void (*successCbFn_t)(const void * ctxtPtr); + + /** + * \brief Callback function used when an operation fails. + * + * \param errCode Erroro code which indicates the error or failure + * \param errInfo Additional information about this specific error or failure in text format + * \param ctxtPtr Callback context pointer + */ + typedef void (*failCbFn_t)(const asyncStoreError_t errCode, const std::string& errInfo, const void* ctxtPtr); + + /** + * \brief Struct used to pass operation code and parameters to AsyncStore::submit(). + */ + struct AsyncStoreOp // Ops require the following tokens: (o=optional): + { // data id queue txn successCb failCb cbCtxt + typedef enum { // ----------------------------------------------- + OP_NONE = 0, // + OP_CONFIG_CREATE, // Y Y N N o o o + OP_CONFIG_DESTROY, // N Y N N o o o + OP_FLUSH, // N N Y N o o o + OP_TXN_BEGIN, // N N N Y o o o + OP_TXN_PREPARE, // N N N Y o o o + OP_TXN_ABORT, // N N N Y o o o + OP_TXN_COMMIT, // N N N Y o o o + OP_EVENT_STORE, // Y Y Y o o o o + OP_ENQUEUE, // Y Y Y o o o o + OP_DEQUEUE // N Y Y o o o o + } opCode_t; + + opCode_t op; ///< Operation code + boost::intrusive_ptr data; ///< Data source for ops where data is persisted + boost::intrusive_ptr id; ///< Token for target of persistence operation + boost::intrusive_ptr queue; ///< Token for queue where a queue context is required + boost::intrusive_ptr txn; ///< Token for transaction context if required + successCbFn_t successCb; ///< Successful completion callback function + failCbFn_t failCb; ///< Failure callback function + const void* cbCtxt; ///< Callback context passed back in callback functions + + // --- Convenience constructors --- + + AsyncStoreOp() : op(OP_NONE), data(0), id(0), queue(0), txn(0), successCb(0), failCb(0), cbCtxt(0) {} + + AsyncStoreOp(const opCode_t op, + const boost::intrusive_ptr data, + const boost::intrusive_ptr id, + const boost::intrusive_ptr queue, + const boost::intrusive_ptr txn, + const successCbFn_t successCb = 0, + const failCbFn_t failCb = 0, + const void* cbCtxt = 0) : + op(op), data(data), id(id), queue(queue), txn(txn), successCb(successCb), failCb(failCb), cbCtxt(cbCtxt) + {} + + }; // struct StoreOp + + + /** + * \brief This is the core async store interface class. + */ + class AsyncStore: public Recoverable + { + public: + // Submit async operation here + virtual int submit(const AsyncStoreOp& op) = 0; + + // Functions to provide tokens for an operation + virtual boost::intrusive_ptr nextConfigToken() = 0; + virtual boost::intrusive_ptr nextEventToken() = 0; + virtual boost::intrusive_ptr nextMessageToken() = 0; + virtual boost::intrusive_ptr nextQueueToken(const std::string& name, const qpid::types::Variant::Map& options) = 0; + virtual boost::intrusive_ptr nextTxnToken(const std::string& xid=std::string()) = 0; + + // Legacy - Restore FTD message, NOT async! + virtual int loadContent(boost::intrusive_ptr msgTok, + const boost::intrusive_ptr queueTok, + char* data, + uint64_t offset, + uint64_t length) = 0; + }; // class AsyncStore + + + /** + * \brief This class contains helper functions that set up StoreOp and pass it to the submit() function. + * + * In some cases, a token instance is returned along with the return code of the submit; + * a std::pair pattern is used for these calls. + * + * I don't want to call this a xxxImpl class, as it is still abstract and needs to be subclassed. Other ideas for + * a class name here welcome! + * + * Usage pattern example: + * ---------------------- + * queueTok <- createQueue(queueName, storeOpts, queueOpts); // Queue is persisted, store space initialized + * + * dtxTok -< beginTxn(xid); // Xid is supplied: distributed txn + * msgTok <- enqueueMsg(msgData, queueTok, dtxTok); // Transactional enqueue of msgData on queue queueTok + * prepareTxn(dtxTok); // dtx prepare + * commitTxn(dtxTok); // end of distributed txn + * + * txnTok <- beginTxn(); // local txn + * dequeue(msgTok, queueTok, txnTok); // Transactional dequeue of message msgTok on queue queueTok + * commitTxn(txnTok); // end of local txn + * + * destroyQueue(queueTok); + */ + class AsyncStorePlus : public AsyncStore + { + // --- Config data --- + + typedef std::pair > storeConfigDataReturn_t; + storeConfigDataReturn_t storeConfigData(const boost::intrusive_ptr configData, + const successCbFn_t complCb = 0, + const failCbFn_t failCb = 0, + const void* cbCtxt = 0); + + int destroyConfigData(const boost::intrusive_ptr configTok, + const successCbFn_t complCb = 0, + const failCbFn_t failCb = 0, + const void* cbCtxt = 0); + + + // --- Queues --- + + typedef std::pair > createQueueReturn_t; + createQueueReturn_t createQueue(const std::string& name, + const qpid::types::Variant::Map& options, + const boost::intrusive_ptr queueData, + const successCbFn_t complCb = 0, + const failCbFn_t failCb = 0, + const void* cbCtxt = 0); + + int flushQueue(const boost::intrusive_ptr queueTok, + const successCbFn_t complCb = 0, + const failCbFn_t failCb = 0, + const void* cbCtxt = 0); + + int destroyQueue(const boost::intrusive_ptr queueTok, + const successCbFn_t complCb = 0, + const failCbFn_t failCb = 0, + const void* cbCtxt = 0); + + + // --- Transactions --- + + typedef std::pair > txnReturn_t; + txnReturn_t beginTxn(const std::string xid = std::string(), + const successCbFn_t complCb = 0, + const failCbFn_t failCb = 0, + const void* cbCtxt = 0); + + int prepareTxn(const boost::intrusive_ptr txnTok, + const successCbFn_t complCb = 0, + const failCbFn_t failCb = 0, + const void* cbCtxt = 0); + + int commitTxn(const boost::intrusive_ptr txnTok, + const successCbFn_t complCb = 0, + const failCbFn_t failCb = 0, + const void* cbCtxt = 0); + + int abortTxn(const boost::intrusive_ptr txnTok, + const successCbFn_t complCb = 0, + const failCbFn_t failCb = 0, + const void* cbCtxt = 0); + + + // --- Event storage --- + + typedef std::pair > storeEventReturn_t; + storeEventReturn_t storeQueueEvent(const boost::intrusive_ptr eventData, + const boost::intrusive_ptr queueTok, + const boost::intrusive_ptr txnTok = 0, + const successCbFn_t complCb = 0, + const failCbFn_t failCb = 0, + const void* cbCtxt = 0); + + + // --- Messages --- + + typedef std::pair > enqReturn_t; + enqReturn_t enqueueMsg(const boost::intrusive_ptr msgData, + const boost::intrusive_ptr queueTok, + const boost::intrusive_ptr txnTok = 0, + const successCbFn_t complCb = 0, + const failCbFn_t failCb = 0, + const void* cbCtxt = 0); + + int dequeueMsg(const boost::intrusive_ptr msgTok, + const boost::intrusive_ptr queueTok, + const boost::intrusive_ptr txnTok = 0, + const successCbFn_t complCb = 0, + const failCbFn_t failCb = 0, + const void* cbCtxt = 0); + }; // class AsyncStorePlus + +}} // namespace qpid::broker + +#endif // qpid_broker_AsyncStore_h_ diff --git a/cpp/src/qpid/broker/AsyncStoreErrors.h b/cpp/src/qpid/broker/AsyncStoreErrors.h new file mode 100644 index 0000000000..748049305c --- /dev/null +++ b/cpp/src/qpid/broker/AsyncStoreErrors.h @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef qpid_broker_AsyncStoreErrors_h_ +#define qpid_broker_AsyncStoreErrors_h_ + +#include // uint16_t, uint32_t + +namespace qpid { +namespace broker { + + // Error codes returned by AsyncStore::submit() + static const int ERR_OK = 0; ///< Successful submission + static const int ERR_PARAMS = -1; ///< Incorrect or bad parameters in struct AsyncStoreOp + static const int ERR_FULL = -2; ///< Event queue full + + /** + * This system allows 32-bit error codes to be expressed as either a struct of two + * uint16_t value or a single uint32_t value. + * + * The idea of having two values is that the interface can possibly specify error classes + * or types (possibly indicating severity too), while allowing the implementation to specify + * an additional code or "sub-code" specific to that implementation. + * + * \todo: Finalize structure (another option: uint8_t class; uint8_t op; uint16_t errCode;) + * and the interface-level error codes. + */ +#pragma pack(1) + typedef struct { + uint16_t errClass; + uint16_t errorCode; + } asyncStoreErrorStruct_t; +#pragma pack() + + typedef union { + asyncStoreErrorStruct_t s; + uint32_t i; + } asyncStoreError_t; + +}} // namespace qpid::broker + + +#endif // qpid_broker_AsyncStoreErrors_h_ diff --git a/cpp/src/qpid/broker/AsyncStoreToken.h b/cpp/src/qpid/broker/AsyncStoreToken.h new file mode 100644 index 0000000000..14659c56e6 --- /dev/null +++ b/cpp/src/qpid/broker/AsyncStoreToken.h @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef qpid_broker_AsyncStoreToken_h_ +#define qpid_broker_AsyncStoreToken_h_ + +#include "qpid/RefCounted.h" +#include // uint64_t + +namespace qpid { +namespace broker { + +/** + * Pointer to data to be persisted. + */ +class StoredData : public qpid::RefCounted +{ +public: + virtual ~StoredData() {} + virtual uint64_t getSize() = 0; + virtual void write(char*) = 0; +}; + +/** + * Token classes, to be implemented by store: + *
+ *
+ *                 +---------------+
+ *                 | IdentityToken |
+ *                 +---------------+
+ *                         ^
+ *                         |
+ *         +---------------+--------------+
+ *         |               |              |
+ *  +------------+  +-------------+  +----------+
+ *  | EventToken |  | ConfigToken |  | TxnToken |
+ *  +------------+  +-------------+  +----------+
+ *         ^               ^
+ *         |               |
+ * +--------------+  +------------+
+ * | MessageToken |  | QueueToken |
+ * +--------------+  +------------+
+ *
+ * 
+ */ + +class IdentityToken : public qpid::RefCounted +{ +public : + virtual ~IdentityToken() {} +}; + +class EventToken : public IdentityToken +{ +public : + virtual ~EventToken() {} +}; + +class MessageToken : public EventToken +{ +public : + virtual ~MessageToken() {} +}; + +class ConfigToken : public IdentityToken +{ +public : + virtual ~ConfigToken() {} +}; + +class QueueToken : public ConfigToken +{ +public : + virtual ~QueueToken() {} +}; + +class TxnToken : public IdentityToken +{ +public : + virtual ~TxnToken() {} +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_AsyncStoreToken_h_ -- cgit v1.2.1