summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/store/MessageStorePlugin.cpp')
-rw-r--r--qpid/cpp/src/qpid/store/MessageStorePlugin.cpp469
1 files changed, 469 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
new file mode 100644
index 0000000000..2a8d971987
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
@@ -0,0 +1,469 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageStorePlugin.h"
+#include "StorageProvider.h"
+#include "StoreException.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/DataDir.h"
+#include "qpid/log/Statement.h"
+
+/*
+ * The MessageStore pointer given to the Broker points to static storage.
+ * Thus, it cannot be deleted, especially by the broker. To prevent deletion,
+ * this no-op deleter is used with the boost::shared_ptr. When the last
+ * shared_ptr is destroyed, the deleter is called rather than delete().
+ */
+namespace {
+ class NoopDeleter {
+ public:
+ NoopDeleter() {}
+ void operator()(qpid::broker::MessageStore * /*p*/) {}
+ };
+}
+
+namespace qpid {
+namespace store {
+
+static MessageStorePlugin static_instance_registers_plugin;
+
+
+MessageStorePlugin::StoreOptions::StoreOptions(const std::string& name) :
+ qpid::Options(name)
+{
+ addOptions()
+ ("storage-provider", qpid::optValue(providerName, "PROVIDER"),
+ "Name of the storage provider to use.")
+ ;
+}
+
+
+void
+MessageStorePlugin::earlyInitialize (qpid::Plugin::Target& target)
+{
+ qpid::broker::Broker* b =
+ dynamic_cast<qpid::broker::Broker*>(&target);
+ if (0 == b)
+ return; // Only listen to Broker targets
+
+ broker = b;
+
+ // See if there are any storage provider plugins ready. If not, we can't
+ // do a message store.
+ qpid::Plugin::earlyInitAll(*this);
+
+ if (providers.empty()) {
+ QPID_LOG(warning,
+ "Message store plugin: No storage providers available.");
+ provider = providers.end();
+ return;
+ }
+ if (!options.providerName.empty()) {
+ // If specific one was chosen, locate it in loaded set of providers.
+ provider = providers.find(options.providerName);
+ if (provider == providers.end())
+ throw Exception("Message store plugin: storage provider '" +
+ options.providerName +
+ "' does not exist.");
+ }
+ else {
+ // No specific provider chosen; if there's only one, use it. Else
+ // report the need to pick one.
+ if (providers.size() > 1) {
+ provider = providers.end();
+ throw Exception("Message store plugin: multiple provider plugins "
+ "loaded; must either load only one or select one "
+ "using --storage-provider");
+ }
+ provider = providers.begin();
+ }
+
+ provider->second->activate(*this);
+ NoopDeleter d;
+ boost::shared_ptr<qpid::broker::MessageStore> sp(this, d);
+ broker->setStore(sp);
+ target.addFinalizer(boost::bind(&MessageStorePlugin::finalizeMe, this));
+}
+
+void
+MessageStorePlugin::initialize(qpid::Plugin::Target& target)
+{
+ qpid::broker::Broker* broker =
+ dynamic_cast<qpid::broker::Broker*>(&target);
+ if (0 == broker)
+ return; // Only listen to Broker targets
+
+ // Pass along the initialize step to the provider that's activated.
+ if (provider != providers.end()) {
+ provider->second->initialize(*this);
+ }
+ // qpid::Plugin::initializeAll(*this);
+}
+
+void
+MessageStorePlugin::finalizeMe()
+{
+ finalize(); // Call finalizers on any Provider plugins
+}
+
+void
+MessageStorePlugin::providerAvailable(const std::string name,
+ StorageProvider *be)
+{
+ ProviderMap::value_type newSp(name, be);
+ std::pair<ProviderMap::iterator, bool> inserted = providers.insert(newSp);
+ if (inserted.second == false)
+ QPID_LOG(warning, "Storage provider " << name << " duplicate; ignored.");
+}
+
+void
+MessageStorePlugin::truncateInit(const bool /*saveStoreContent*/)
+{
+ QPID_LOG(info, "Store: truncateInit");
+}
+
+
+/**
+ * Record the existence of a durable queue
+ */
+void
+MessageStorePlugin::create(broker::PersistableQueue& queue,
+ const framing::FieldTable& args)
+{
+ if (queue.getName().size() == 0)
+ {
+ QPID_LOG(error,
+ "Cannot create store for empty (null) queue name - "
+ "ignoring and attempting to continue.");
+ return;
+ }
+ if (queue.getPersistenceId()) {
+ THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
+ }
+ provider->second->create(queue, args);
+}
+
+/**
+ * Destroy a durable queue
+ */
+void
+MessageStorePlugin::destroy(broker::PersistableQueue& queue)
+{
+ provider->second->destroy(queue);
+}
+
+/**
+ * Record the existence of a durable exchange
+ */
+void
+MessageStorePlugin::create(const broker::PersistableExchange& exchange,
+ const framing::FieldTable& args)
+{
+ if (exchange.getPersistenceId()) {
+ THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
+ }
+ provider->second->create(exchange, args);
+}
+
+/**
+ * Destroy a durable exchange
+ */
+void
+MessageStorePlugin::destroy(const broker::PersistableExchange& exchange)
+{
+ provider->second->destroy(exchange);
+}
+
+/**
+ * Record a binding
+ */
+void
+MessageStorePlugin::bind(const broker::PersistableExchange& exchange,
+ const broker::PersistableQueue& queue,
+ const std::string& key,
+ const framing::FieldTable& args)
+{
+ provider->second->bind(exchange, queue, key, args);
+}
+
+/**
+ * Forget a binding
+ */
+void
+MessageStorePlugin::unbind(const broker::PersistableExchange& exchange,
+ const broker::PersistableQueue& queue,
+ const std::string& key,
+ const framing::FieldTable& args)
+{
+ provider->second->unbind(exchange, queue, key, args);
+}
+
+/**
+ * Record generic durable configuration
+ */
+void
+MessageStorePlugin::create(const broker::PersistableConfig& config)
+{
+ if (config.getPersistenceId()) {
+ THROW_STORE_EXCEPTION("Config item already created: " +
+ config.getName());
+ }
+ provider->second->create(config);
+}
+
+/**
+ * Destroy generic durable configuration
+ */
+void
+MessageStorePlugin::destroy(const broker::PersistableConfig& config)
+{
+ provider->second->destroy(config);
+}
+
+/**
+ * Stores a message before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point).
+ */
+void
+MessageStorePlugin::stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg)
+{
+ if (msg->getPersistenceId() == 0 && !msg->isContentReleased()) {
+ provider->second->stage(msg);
+ }
+}
+
+/**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+void
+MessageStorePlugin::destroy(broker::PersistableMessage& msg)
+{
+ if (msg.getPersistenceId())
+ provider->second->destroy(msg);
+}
+
+/**
+ * Appends content to a previously staged message
+ */
+void
+MessageStorePlugin::appendContent
+ (const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+ const std::string& data)
+{
+ if (msg->getPersistenceId())
+ provider->second->appendContent(msg, data);
+ else
+ THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!");
+}
+
+/**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+void
+MessageStorePlugin::loadContent(const broker::PersistableQueue& queue,
+ const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+ if (msg->getPersistenceId())
+ provider->second->loadContent(queue, msg, data, offset, length);
+ else
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+}
+
+/**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ */
+void
+MessageStorePlugin::enqueue(broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+ const broker::PersistableQueue& queue)
+{
+ if (queue.getPersistenceId() == 0) {
+ THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
+ }
+ provider->second->enqueue(ctxt, msg, queue);
+}
+
+/**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ */
+void
+MessageStorePlugin::dequeue(broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+ const broker::PersistableQueue& queue)
+{
+ provider->second->dequeue(ctxt, msg, queue);
+}
+
+/**
+ * Flushes all async messages to disk for the specified queue
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ */
+void
+MessageStorePlugin::flush(const broker::PersistableQueue& queue)
+{
+ provider->second->flush(queue);
+}
+
+/**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk.
+ */
+uint32_t
+MessageStorePlugin::outstandingQueueAIO(const broker::PersistableQueue& queue)
+{
+ return provider->second->outstandingQueueAIO(queue);
+}
+
+std::auto_ptr<broker::TransactionContext>
+MessageStorePlugin::begin()
+{
+ return provider->second->begin();
+}
+
+std::auto_ptr<broker::TPCTransactionContext>
+MessageStorePlugin::begin(const std::string& xid)
+{
+ return provider->second->begin(xid);
+}
+
+void
+MessageStorePlugin::prepare(broker::TPCTransactionContext& ctxt)
+{
+ provider->second->prepare(ctxt);
+}
+
+void
+MessageStorePlugin::commit(broker::TransactionContext& ctxt)
+{
+ provider->second->commit(ctxt);
+}
+
+void
+MessageStorePlugin::abort(broker::TransactionContext& ctxt)
+{
+ provider->second->abort(ctxt);
+}
+
+void
+MessageStorePlugin::collectPreparedXids(std::set<std::string>& xids)
+{
+ provider->second->collectPreparedXids(xids);
+}
+
+/**
+ * Request recovery of queue and message state; inherited from Recoverable
+ */
+void
+MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
+{
+ ExchangeMap exchanges;
+ QueueMap queues;
+ MessageMap messages;
+ MessageQueueMap messageQueueMap;
+ std::vector<std::string> xids;
+ PreparedTransactionMap dtxMap;
+
+ provider->second->recoverConfigs(recoverer);
+ provider->second->recoverExchanges(recoverer, exchanges);
+ provider->second->recoverQueues(recoverer, queues);
+ provider->second->recoverBindings(recoverer, exchanges, queues);
+ // Important to recover messages before transactions in the SQL-CLFS
+ // case. If this becomes a problem, it may be possible to resolve it.
+ // If in doubt please raise a jira and notify Steve Huston
+ // <shuston@riverace.com>.
+ provider->second->recoverMessages(recoverer, messages, messageQueueMap);
+ provider->second->recoverTransactions(recoverer, dtxMap);
+ // Enqueue msgs where needed.
+ for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
+ i != messageQueueMap.end();
+ ++i) {
+ // Locate the message corresponding to the current message Id
+ MessageMap::const_iterator iMsg = messages.find(i->first);
+ if (iMsg == messages.end()) {
+ std::ostringstream oss;
+ oss << "No matching message trying to re-enqueue message "
+ << i->first;
+ THROW_STORE_EXCEPTION(oss.str());
+ }
+ broker::RecoverableMessage::shared_ptr msg = iMsg->second;
+ // Now for each queue referenced in the queue map, locate it
+ // and re-enqueue the message.
+ for (std::vector<QueueEntry>::const_iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ // Locate the queue corresponding to the current queue Id
+ QueueMap::const_iterator iQ = queues.find(j->queueId);
+ if (iQ == queues.end()) {
+ std::ostringstream oss;
+ oss << "No matching queue trying to re-enqueue message "
+ << " on queue Id " << j->queueId;
+ THROW_STORE_EXCEPTION(oss.str());
+ }
+ // Messages involved in prepared transactions have their status
+ // updated accordingly. First, though, restore a message that
+ // is expected to be on a queue, including non-transacted
+ // messages and those pending dequeue in a dtx.
+ if (j->tplStatus != QueueEntry::ADDING)
+ iQ->second->recover(msg);
+ switch(j->tplStatus) {
+ case QueueEntry::ADDING:
+ dtxMap[j->xid]->enqueue(iQ->second, msg);
+ break;
+ case QueueEntry::REMOVING:
+ dtxMap[j->xid]->dequeue(iQ->second, msg);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+}
+
+}} // namespace qpid::store