#ifndef QPID_STORE_STORAGEPROVIDER_H #define QPID_STORE_STORAGEPROVIDER_H /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include #include #include "qpid/Exception.h" #include "qpid/Plugin.h" #include "qpid/Options.h" #include "qpid/broker/MessageStore.h" using qpid::broker::PersistableConfig; using qpid::broker::PersistableExchange; using qpid::broker::PersistableMessage; using qpid::broker::PersistableQueue; namespace qpid { namespace store { typedef std::map ExchangeMap; typedef std::map QueueMap; typedef std::map MessageMap; // Msg Id -> vector of queue entries where message is queued struct QueueEntry { enum TplStatus { NONE = 0, ADDING = 1, REMOVING = 2 }; uint64_t queueId; TplStatus tplStatus; std::string xid; QueueEntry(uint64_t id, TplStatus tpl = NONE, const std::string& x = "") : queueId(id), tplStatus(tpl), xid(x) {} bool operator==(const QueueEntry& rhs) { if (queueId != rhs.queueId) return false; if (tplStatus == NONE && rhs.tplStatus == NONE) return true; return xid == rhs.xid; } }; typedef std::map > MessageQueueMap; typedef std::map PreparedTransactionMap; class MessageStorePlugin; /** * @class StorageProvider * * StorageProvider defines the interface for the storage provider plugin to the * Qpid broker persistence store plugin. * * @TODO Should StorageProvider also inherit from MessageStore? If so, then * maybe remove Recoverable from MessageStore's inheritance and move it * to MessageStorePlugin? In any event, somehow the discardInit() feature * needs to get added here. */ class StorageProvider : public qpid::Plugin, public qpid::broker::MessageStore { public: class Exception : public qpid::Exception { public: virtual ~Exception() throw() {} virtual const char *what() const throw() = 0; }; /** * @name Methods inherited from qpid::Plugin */ //@{ /** * Return a pointer to the provider's options. The options will be * updated during option parsing by the host program; therefore, the * referenced Options object must remain valid past this function's return. * * @return An options group or 0 for no options. Default returns 0. * Plugin retains ownership of return value. */ virtual qpid::Options* getOptions() = 0; /** * Initialize Plugin functionality on a Target, called before * initializing the target. * * StorageProviders should respond only to Targets of class * qpid::store::MessageStorePlugin and ignore all others. * * When called, the provider should invoke the method * qpid::store::MessageStorePlugin::providerAvailable() to alert the * message store of StorageProvider's availability. * * Called before the target itself is initialized. */ virtual void earlyInitialize (Plugin::Target& target) = 0; /** * Initialize StorageProvider functionality. Called after initializing * the target. * * StorageProviders should respond only to Targets of class * qpid::store::MessageStorePlugin and ignore all others. * * Called after the target is fully initialized. */ virtual void initialize(Plugin::Target& target) = 0; //@} /** * Receive notification that this provider is the one that will actively * handle storage for the target. If the provider is to be used, this * method will be called after earlyInitialize() and before any * recovery operations (recovery, in turn, precedes call to initialize()). * Thus, it is wise to not actually do any database ops from within * earlyInitialize() - they can wait until activate() is called because * at that point it is certain the database will be needed. */ virtual void activate(MessageStorePlugin &store) = 0; /** * @name Methods inherited from qpid::broker::MessageStore */ //@{ /** * If called after init() but before recovery, will discard the database * and reinitialize using an empty store dir. If @a pushDownStoreFiles * is true, the content of the store dir will be moved to a backup dir * inside the store dir. This is used when cluster nodes recover and must * get thier content from a cluster sync rather than directly fromt the * store. * * @param pushDownStoreFiles If true, will move content of the store dir * into a subdir, leaving the store dir * otherwise empty. */ virtual void truncateInit(const bool pushDownStoreFiles = false) = 0; /** * Record the existence of a durable queue */ virtual void create(PersistableQueue& queue, const qpid::framing::FieldTable& args) = 0; /** * Destroy a durable queue */ virtual void destroy(PersistableQueue& queue) = 0; /** * Record the existence of a durable exchange */ virtual void create(const PersistableExchange& exchange, const qpid::framing::FieldTable& args) = 0; /** * Destroy a durable exchange */ virtual void destroy(const PersistableExchange& exchange) = 0; /** * Record a binding */ virtual void bind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const qpid::framing::FieldTable& args) = 0; /** * Forget a binding */ virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const qpid::framing::FieldTable& args) = 0; /** * Record generic durable configuration */ virtual void create(const PersistableConfig& config) = 0; /** * Destroy generic durable configuration */ virtual void destroy(const PersistableConfig& config) = 0; /** * Stores a messages before it has been enqueued * (enqueueing automatically stores the message so this is * only required if storage is required prior to that * point). If the message has not yet been stored it will * store the headers as well as any content passed in. A * persistence id will be set on the message which can be * used to load the content or to append to it. */ virtual void stage(const boost::intrusive_ptr& msg) = 0; /** * Destroys a previously staged message. This only needs * to be called if the message is never enqueued. (Once * enqueued, deletion will be automatic when the message * is dequeued from all queues it was enqueued onto). */ virtual void destroy(PersistableMessage& msg) = 0; /** * Appends content to a previously staged message */ virtual void appendContent(const boost::intrusive_ptr& msg, const std::string& data) = 0; /** * Loads (a section) of content data for the specified * message (previously stored through a call to stage or * enqueue) into data. The offset refers to the content * only (i.e. an offset of 0 implies that the start of the * content should be loaded, not the headers or related * meta-data). */ virtual void loadContent(const PersistableQueue& queue, const boost::intrusive_ptr& msg, std::string& data, uint64_t offset, uint32_t length) = 0; /** * Enqueues a message, storing the message if it has not * been previously stored and recording that the given * message is on the given queue. * * Note: that this is async so the return of the function does * not mean the opperation is complete. * * @param msg the message to enqueue * @param queue the name of the queue onto which it is to be enqueued * @param xid (a pointer to) an identifier of the * distributed transaction in which the operation takes * place or null for 'local' transactions */ virtual void enqueue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr& msg, const PersistableQueue& queue) = 0; /** * Dequeues a message, recording that the given message is * no longer on the given queue and deleting the message * if it is no longer on any other queue. * * Note: that this is async so the return of the function does * not mean the opperation is complete. * * @param msg the message to dequeue * @param queue the name of the queue from which it is to be dequeued * @param xid (a pointer to) an identifier of the * distributed transaction in which the operation takes * place or null for 'local' transactions */ virtual void dequeue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr& msg, const PersistableQueue& queue) = 0; /** * Flushes all async messages to disk for the specified queue * * Note: that this is async so the return of the function does * not mean the opperation is complete. * * @param queue the name of the queue from which it is to be dequeued */ virtual void flush(const qpid::broker::PersistableQueue& queue) = 0; /** * Returns the number of outstanding AIO's for a given queue * * If 0, than all the enqueue / dequeues have been stored * to disk * * @param queue the name of the queue to check for outstanding AIO */ virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) = 0; //@} /** * @TODO This should probably not be here - it's only here because * MessageStore inherits from Recoverable... maybe move that derivation. * * As it is now, we don't use this. Separate recover methods are * declared below for individual types, which also set up maps of * messages, queues, transactions for the main store plugin to handle * properly. * * Request recovery of queue and message state. */ virtual void recover(qpid::broker::RecoveryManager& /*recoverer*/) {} /** * @name Methods that do the recovery of the various objects that * were saved. */ //@{ /** * Recover bindings. */ virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer) = 0; virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer, ExchangeMap& exchangeMap) = 0; virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer, QueueMap& queueMap) = 0; virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer, const ExchangeMap& exchangeMap, const QueueMap& queueMap) = 0; virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, MessageMap& messageMap, MessageQueueMap& messageQueueMap) = 0; virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer, PreparedTransactionMap& dtxMap) = 0; //@} }; }} // namespace qpid::store #endif /* QPID_STORE_STORAGEPROVIDER_H */