summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2009-10-21 02:11:04 +0000
committerStephen D. Huston <shuston@apache.org>2009-10-21 02:11:04 +0000
commitf3565f9fa6edde79a3d40006abf44b568f7279f0 (patch)
treea09dc2471320677bc6dc4cb9d7f27a4cc06459bd /qpid/cpp
parent70800c4ee0975aa890b8ce9a87eda24a5b870f24 (diff)
downloadqpid-python-f3565f9fa6edde79a3d40006abf44b568f7279f0.tar.gz
Initial checkin of portable message store plugin and MS SQL-specific storage provider. Goes with QPID-2017
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@827870 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStore.h10
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.h4
-rw-r--r--qpid/cpp/src/qpid/store/CMakeLists.txt74
-rw-r--r--qpid/cpp/src/qpid/store/MessageStorePlugin.cpp444
-rw-r--r--qpid/cpp/src/qpid/store/MessageStorePlugin.h284
-rw-r--r--qpid/cpp/src/qpid/store/StorageProvider.h322
-rw-r--r--qpid/cpp/src/qpid/store/StoreException.h49
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp89
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h84
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp150
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h84
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp64
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h62
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp133
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h61
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp85
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h53
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp70
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h62
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/Exception.h56
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp989
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp115
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h69
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp182
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h85
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp117
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/Recordset.h101
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/State.cpp45
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/State.h52
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp59
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h61
31 files changed, 4104 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageStore.h b/qpid/cpp/src/qpid/broker/MessageStore.h
index f5c55a50f8..143e860ec7 100644
--- a/qpid/cpp/src/qpid/broker/MessageStore.h
+++ b/qpid/cpp/src/qpid/broker/MessageStore.h
@@ -46,15 +46,7 @@ class MessageStore : public TransactionalStore, public Recoverable {
public:
/**
- * init the store, call before any other call. If not called, store
- * is free to pick any defaults
- *
- * @param options Options object provided by concrete store plug in.
- */
- virtual bool init(const Options* options) = 0;
-
- /**
- * If called after init() but before recovery, will discard the database
+ * If called after initialization but before recovery, will discard the database
* and reinitialize using an empty store dir. If the parameter pushDownStoreFiles
* is true, the content of the store dir will be moved to a backup dir inside the
* store dir. This is used when cluster nodes recover and must get thier content
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h
index 2576e266d2..7d49491dfd 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.h
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h
@@ -104,9 +104,9 @@ class PersistableMessage : public Persistable
void flush();
- bool QPID_BROKER_EXTERN isContentReleased() const;
+ QPID_BROKER_EXTERN bool isContentReleased() const;
- void QPID_BROKER_EXTERN setStore(MessageStore*);
+ QPID_BROKER_EXTERN void setStore(MessageStore*);
void requestContentRelease();
void blockContentRelease();
bool checkContentReleasable();
diff --git a/qpid/cpp/src/qpid/store/CMakeLists.txt b/qpid/cpp/src/qpid/store/CMakeLists.txt
new file mode 100644
index 0000000000..452d4b7cf0
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/CMakeLists.txt
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+project(qpidc_store)
+
+#set (CMAKE_VERBOSE_MAKEFILE ON) # for debugging
+
+include_directories( ${Boost_INCLUDE_DIR} )
+
+include_directories( ${CMAKE_CURRENT_SOURCE_DIR} )
+include_directories( ${CMAKE_HOME_DIRECTORY}/include )
+
+link_directories( ${Boost_LIBRARY_DIRS} )
+
+set (store_SOURCES
+ MessageStorePlugin.cpp
+ )
+add_library (store MODULE ${store_SOURCES})
+target_link_libraries (store qpidbroker ${Boost_PROGRAM_OPTIONS_LIBRARY})
+if (CMAKE_COMPILER_IS_GNUCXX)
+ set_target_properties (store PROPERTIES
+ PREFIX ""
+ LINK_FLAGS -Wl,--no-undefined)
+endif (CMAKE_COMPILER_IS_GNUCXX)
+
+if (CMAKE_SYSTEM_NAME STREQUAL Windows)
+ if (MSVC)
+ add_definitions(
+ /D "NOMINMAX"
+ /D "WIN32_LEAN_AND_MEAN"
+ )
+ endif (MSVC)
+endif (CMAKE_SYSTEM_NAME STREQUAL Windows)
+
+set_target_properties (store PROPERTIES VERSION ${qpidc_version})
+
+# Build the MS SQL Storage Provider plugin
+set (mssql_default ON)
+if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+ set(mssql_default OFF)
+endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+option(BUILD_MSSQL "Build MS SQL Store provider plugin" ${mssql_default})
+if (BUILD_MSSQL)
+ add_library (mssql_store MODULE
+ ms-sql/MsSqlProvider.cpp
+ ms-sql/AmqpTransaction.cpp
+ ms-sql/BindingRecordset.cpp
+ ms-sql/BlobAdapter.cpp
+ ms-sql/BlobEncoder.cpp
+ ms-sql/BlobRecordset.cpp
+ ms-sql/DatabaseConnection.cpp
+ ms-sql/MessageMapRecordset.cpp
+ ms-sql/MessageRecordset.cpp
+ ms-sql/Recordset.cpp
+ ms-sql/State.cpp
+ ms-sql/VariantHelper.cpp)
+ target_link_libraries (mssql_store qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
+endif (BUILD_MSSQL)
diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
new file mode 100644
index 0000000000..385045344f
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
@@ -0,0 +1,444 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageStorePlugin.h"
+#include "StorageProvider.h"
+#include "StoreException.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/DataDir.h"
+#include "qpid/log/Statement.h"
+
+/*
+ * The MessageStore pointer given to the Broker points to static storage.
+ * Thus, it cannot be deleted, especially by the broker. To prevent deletion,
+ * this no-op deleter is used with the boost::shared_ptr. When the last
+ * shared_ptr is destroyed, the deleter is called rather than delete().
+ */
+namespace {
+ class NoopDeleter {
+ public:
+ NoopDeleter() {}
+ void operator()(qpid::broker::MessageStore *p) {}
+ };
+}
+
+namespace qpid {
+namespace store {
+
+static MessageStorePlugin static_instance_registers_plugin;
+
+
+MessageStorePlugin::StoreOptions::StoreOptions(const std::string& name) :
+ qpid::Options(name)
+{
+ addOptions()
+ ("storage-provider", qpid::optValue(providerName, "PROVIDER"),
+ "Name of the storage provider to use.")
+ ;
+}
+
+
+void
+MessageStorePlugin::earlyInitialize (qpid::Plugin::Target& target)
+{
+ qpid::broker::Broker* broker =
+ dynamic_cast<qpid::broker::Broker*>(&target);
+ if (0 == broker)
+ return; // Only listen to Broker targets
+
+ // See if there are any storage provider plugins ready. If not, we can't
+ // do a message store.
+ qpid::Plugin::earlyInitAll(*this);
+
+ if (providers.empty()) {
+ QPID_LOG(warning,
+ "Message store plugin: No storage providers available.");
+ return;
+ }
+ if (!options.providerName.empty()) {
+ // If specific one was chosen, locate it in loaded set of providers.
+ provider = providers.find(options.providerName);
+ if (provider == providers.end())
+ throw Exception("Message store plugin: storage provider '" +
+ options.providerName +
+ "' does not exist.");
+ }
+ else {
+ // No specific provider chosen; if there's only one, use it. Else
+ // report the need to pick one.
+ if (providers.size() > 1)
+ throw Exception("Message store plugin: multiple provider plugins "
+ "loaded; must either load only one or select one "
+ "using --storage-provider");
+ provider = providers.begin();
+ }
+
+ provider->second->activate(*this);
+ NoopDeleter d;
+ boost::shared_ptr<qpid::broker::MessageStore> sp(this, d);
+ broker->setStore(sp);
+ target.addFinalizer(boost::bind(&MessageStorePlugin::finalizeMe, this));
+}
+
+void
+MessageStorePlugin::initialize(qpid::Plugin::Target& target)
+{
+ qpid::broker::Broker* broker =
+ dynamic_cast<qpid::broker::Broker*>(&target);
+ if (0 == broker)
+ return; // Only listen to Broker targets
+
+ // Pass along the initialize step to the provider that's activated.
+ if (provider != providers.end()) {
+ provider->second->initialize(*this);
+ }
+ // qpid::Plugin::initializeAll(*this);
+}
+
+void
+MessageStorePlugin::finalizeMe()
+{
+ finalize(); // Call finalizers on any Provider plugins
+}
+
+void
+MessageStorePlugin::providerAvailable(const std::string name,
+ StorageProvider *be)
+{
+ ProviderMap::value_type newSp(name, be);
+ std::pair<ProviderMap::iterator, bool> inserted = providers.insert(newSp);
+ if (inserted.second == false)
+ QPID_LOG(warning, "Storage provider " << name << " duplicate; ignored.");
+}
+
+void
+MessageStorePlugin::truncateInit(const bool /*saveStoreContent*/)
+{
+ QPID_LOG(info, "Store: truncateInit");
+}
+
+
+/**
+ * Record the existence of a durable queue
+ */
+void
+MessageStorePlugin::create(broker::PersistableQueue& queue,
+ const framing::FieldTable& args)
+{
+ if (queue.getName().size() == 0)
+ {
+ QPID_LOG(error,
+ "Cannot create store for empty (null) queue name - "
+ "ignoring and attempting to continue.");
+ return;
+ }
+ if (queue.getPersistenceId()) {
+ THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
+ }
+ provider->second->create(queue, args);
+}
+
+/**
+ * Destroy a durable queue
+ */
+void
+MessageStorePlugin::destroy(broker::PersistableQueue& queue)
+{
+ provider->second->destroy(queue);
+}
+
+/**
+ * Record the existence of a durable exchange
+ */
+void
+MessageStorePlugin::create(const broker::PersistableExchange& exchange,
+ const framing::FieldTable& args)
+{
+ if (exchange.getPersistenceId()) {
+ THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
+ }
+ provider->second->create(exchange, args);
+}
+
+/**
+ * Destroy a durable exchange
+ */
+void
+MessageStorePlugin::destroy(const broker::PersistableExchange& exchange)
+{
+ provider->second->destroy(exchange);
+}
+
+/**
+ * Record a binding
+ */
+void
+MessageStorePlugin::bind(const broker::PersistableExchange& exchange,
+ const broker::PersistableQueue& queue,
+ const std::string& key,
+ const framing::FieldTable& args)
+{
+ provider->second->bind(exchange, queue, key, args);
+}
+
+/**
+ * Forget a binding
+ */
+void
+MessageStorePlugin::unbind(const broker::PersistableExchange& exchange,
+ const broker::PersistableQueue& queue,
+ const std::string& key,
+ const framing::FieldTable& args)
+{
+ provider->second->unbind(exchange, queue, key, args);
+}
+
+/**
+ * Record generic durable configuration
+ */
+void
+MessageStorePlugin::create(const broker::PersistableConfig& config)
+{
+ if (config.getPersistenceId()) {
+ THROW_STORE_EXCEPTION("Config item already created: " +
+ config.getName());
+ }
+ provider->second->create(config);
+}
+
+/**
+ * Destroy generic durable configuration
+ */
+void
+MessageStorePlugin::destroy(const broker::PersistableConfig& config)
+{
+ provider->second->destroy(config);
+}
+
+/**
+ * Stores a message before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point).
+ */
+void
+MessageStorePlugin::stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg)
+{
+ if (msg->getPersistenceId() == 0 && !msg->isContentReleased()) {
+ provider->second->stage(msg);
+ }
+}
+
+/**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+void
+MessageStorePlugin::destroy(broker::PersistableMessage& msg)
+{
+ if (msg.getPersistenceId())
+ provider->second->destroy(msg);
+}
+
+/**
+ * Appends content to a previously staged message
+ */
+void
+MessageStorePlugin::appendContent
+ (const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+ const std::string& data)
+{
+ if (msg->getPersistenceId())
+ provider->second->appendContent(msg, data);
+ else
+ THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!");
+}
+
+/**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+void
+MessageStorePlugin::loadContent(const broker::PersistableQueue& queue,
+ const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+ if (msg->getPersistenceId())
+ provider->second->loadContent(queue, msg, data, offset, length);
+ else
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+}
+
+/**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ */
+void
+MessageStorePlugin::enqueue(broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+ const broker::PersistableQueue& queue)
+{
+ if (queue.getPersistenceId() == 0) {
+ THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
+ }
+ provider->second->enqueue(ctxt, msg, queue);
+}
+
+/**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ */
+void
+MessageStorePlugin::dequeue(broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+ const broker::PersistableQueue& queue)
+{
+ provider->second->dequeue(ctxt, msg, queue);
+}
+
+/**
+ * Flushes all async messages to disk for the specified queue
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ */
+void
+MessageStorePlugin::flush(const broker::PersistableQueue& queue)
+{
+ provider->second->flush(queue);
+}
+
+/**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk.
+ */
+uint32_t
+MessageStorePlugin::outstandingQueueAIO(const broker::PersistableQueue& queue)
+{
+ return provider->second->outstandingQueueAIO(queue);
+}
+
+std::auto_ptr<broker::TransactionContext>
+MessageStorePlugin::begin()
+{
+ return provider->second->begin();
+}
+
+std::auto_ptr<broker::TPCTransactionContext>
+MessageStorePlugin::begin(const std::string& xid)
+{
+ return provider->second->begin(xid);
+}
+
+void
+MessageStorePlugin::prepare(broker::TPCTransactionContext& ctxt)
+{
+ provider->second->prepare(ctxt);
+}
+
+void
+MessageStorePlugin::commit(broker::TransactionContext& ctxt)
+{
+ provider->second->commit(ctxt);
+}
+
+void
+MessageStorePlugin::abort(broker::TransactionContext& ctxt)
+{
+ provider->second->abort(ctxt);
+}
+
+void
+MessageStorePlugin::collectPreparedXids(std::set<std::string>& xids)
+{
+ provider->second->collectPreparedXids(xids);
+}
+
+/**
+ * Request recovery of queue and message state; inherited from Recoverable
+ */
+void
+MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
+{
+ ExchangeMap exchanges;
+ QueueMap queues;
+ MessageMap messages;
+ MessageQueueMap messageQueueMap;
+
+ provider->second->recoverConfigs(recoverer);
+ provider->second->recoverExchanges(recoverer, exchanges);
+ provider->second->recoverQueues(recoverer, queues);
+ provider->second->recoverBindings(recoverer, exchanges);
+ provider->second->recoverMessages(recoverer, messages, messageQueueMap);
+ // Enqueue msgs where needed.
+ for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
+ i != messageQueueMap.end();
+ ++i) {
+ // Locate the message corresponding to the current message Id
+ MessageMap::const_iterator iMsg = messages.find(i->first);
+ if (iMsg == messages.end()) {
+ std::ostringstream oss;
+ oss << "No matching message trying to re-enqueue message "
+ << i->first;
+ THROW_STORE_EXCEPTION(oss.str());
+ }
+ broker::RecoverableMessage::shared_ptr msg = iMsg->second;
+ // Now for each queue referenced in the queue map, locate it
+ // and re-enqueue the message.
+ for (std::vector<uint64_t>::const_iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ // Locate the queue corresponding to the current queue Id
+ QueueMap::const_iterator iQ = queues.find(*j);
+ if (iQ == queues.end()) {
+ std::ostringstream oss;
+ oss << "No matching queue trying to re-enqueue message "
+ << " on queue Id " << *j;
+ THROW_STORE_EXCEPTION(oss.str());
+ }
+ iQ->second->recover(msg);
+ }
+ }
+
+ // recoverTransactions() and apply correctly while re-enqueuing
+}
+
+}} // namespace qpid::store
diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.h b/qpid/cpp/src/qpid/store/MessageStorePlugin.h
new file mode 100644
index 0000000000..529a59401e
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.h
@@ -0,0 +1,284 @@
+#ifndef QPID_STORE_MESSAGESTOREPLUGIN_H
+#define QPID_STORE_MESSAGESTOREPLUGIN_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/PersistableExchange.h"
+#include "qpid/broker/PersistableMessage.h"
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/management/Manageable.h"
+
+#include <string>
+
+using namespace qpid;
+
+namespace qpid {
+namespace store {
+
+class StorageProvider;
+
+/**
+ * @class MessageStorePlugin
+ *
+ * MessageStorePlugin is the front end of the persistent message store
+ * plugin. It is responsible for coordinating recovery, initialization,
+ * transactions (both local and distributed), flow-to-disk loading and
+ * unloading and persisting broker state (queues, bindings etc.).
+ * Actual storage operations are carried out by a message store storage
+ * provider that implements the qpid::store::StorageProvider interface.
+ */
+class MessageStorePlugin :
+ public qpid::Plugin,
+ public qpid::broker::MessageStore, // Frontend classes
+ public qpid::Plugin::Target // Provider target
+ // @TODO Need a mgmt story for this. Maybe allow r/o access to provider store info? public qpid::management::Manageable
+{
+ public:
+ MessageStorePlugin() {}
+
+ /**
+ * @name Methods inherited from qpid::Plugin
+ */
+ //@{
+ virtual Options* getOptions() { return &options; }
+ virtual void earlyInitialize (Plugin::Target& target);
+ virtual void initialize(Plugin::Target& target);
+ //@}
+
+ /// Finalizer; calls Target::finalize() to run finalizers on
+ /// StorageProviders.
+ void finalizeMe();
+
+ /**
+ * Called by StorageProvider instances during the earlyInitialize sequence.
+ * Each StorageProvider must supply a unique name by which it is known and a
+ * pointer to itself.
+ */
+ virtual void providerAvailable(const std::string name, StorageProvider *be);
+
+ /**
+ * @name Methods inherited from qpid::broker::MessageStore
+ */
+ //@{
+ /**
+ * If called before recovery, will discard the database and reinitialize
+ * using an empty store. This is used when cluster nodes recover and
+ * must get their content from a cluster sync rather than directly from
+ * the store.
+ *
+ * @param saveStoreContent If true, the store's contents should be
+ * saved to a backup location before
+ * reinitializing the store content.
+ */
+ virtual void truncateInit(const bool saveStoreContent = false);
+
+ /**
+ * Record the existence of a durable queue
+ */
+ virtual void create(broker::PersistableQueue& queue,
+ const framing::FieldTable& args);
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(broker::PersistableQueue& queue);
+
+ /**
+ * Record the existence of a durable exchange
+ */
+ virtual void create(const broker::PersistableExchange& exchange,
+ const framing::FieldTable& args);
+ /**
+ * Destroy a durable exchange
+ */
+ virtual void destroy(const broker::PersistableExchange& exchange);
+
+ /**
+ * Record a binding
+ */
+ virtual void bind(const broker::PersistableExchange& exchange,
+ const broker::PersistableQueue& queue,
+ const std::string& key,
+ const framing::FieldTable& args);
+
+ /**
+ * Forget a binding
+ */
+ virtual void unbind(const broker::PersistableExchange& exchange,
+ const broker::PersistableQueue& queue,
+ const std::string& key,
+ const framing::FieldTable& args);
+
+ /**
+ * Record generic durable configuration
+ */
+ virtual void create(const broker::PersistableConfig& config);
+
+ /**
+ * Destroy generic durable configuration
+ */
+ virtual void destroy(const broker::PersistableConfig& config);
+
+ /**
+ * Stores a message before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+ virtual void stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg);
+
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(broker::PersistableMessage& msg);
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+ const std::string& data);
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+ virtual void loadContent(const broker::PersistableQueue& queue,
+ const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+ const broker::PersistableQueue& queue);
+
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of the queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+ const broker::PersistableQueue& queue);
+
+ /**
+ * Flushes all async messages to disk for the specified queue
+ *
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ *
+ * @param queue the name of the queue from which it is to be dequeued
+ */
+ virtual void flush(const broker::PersistableQueue& queue);
+
+ /**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk
+ *
+ * @param queue the name of the queue to check for outstanding AIO
+ */
+ virtual uint32_t outstandingQueueAIO(const broker::PersistableQueue& queue);
+ //@}
+
+ /**
+ * @name Methods inherited from qpid::broker::TransactionalStore
+ */
+ //@{
+ std::auto_ptr<broker::TransactionContext> begin();
+
+ std::auto_ptr<broker::TPCTransactionContext> begin(const std::string& xid);
+
+ void prepare(broker::TPCTransactionContext& ctxt);
+
+ void commit(broker::TransactionContext& ctxt);
+
+ void abort(broker::TransactionContext& ctxt);
+
+ void collectPreparedXids(std::set<std::string>& xids);
+ //@}
+
+ /**
+ * Request recovery of queue and message state; inherited from Recoverable
+ */
+ virtual void recover(broker::RecoveryManager& recoverer);
+
+ // inline management::Manageable::status_t ManagementMethod (uint32_t, management::Args&, std::string&)
+ // { return management::Manageable::STATUS_OK; }
+
+ protected:
+
+ struct StoreOptions : public qpid::Options {
+ StoreOptions(const std::string& name="Store Options");
+ std::string providerName;
+ };
+ StoreOptions options;
+
+ typedef std::map<const std::string, StorageProvider*> ProviderMap;
+ ProviderMap providers;
+ ProviderMap::const_iterator provider;
+
+}; // class MessageStoreImpl
+
+} // namespace msgstore
+} // namespace mrg
+
+#endif /* QPID_SERIALIZER_H */
diff --git a/qpid/cpp/src/qpid/store/StorageProvider.h b/qpid/cpp/src/qpid/store/StorageProvider.h
new file mode 100644
index 0000000000..b0db13c41f
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/StorageProvider.h
@@ -0,0 +1,322 @@
+#ifndef QPID_STORE_STORAGEPROVIDER_H
+#define QPID_STORE_STORAGEPROVIDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <map>
+#include <stdexcept>
+#include <vector>
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/broker/MessageStore.h"
+
+using qpid::broker::PersistableConfig;
+using qpid::broker::PersistableExchange;
+using qpid::broker::PersistableMessage;
+using qpid::broker::PersistableQueue;
+
+namespace qpid {
+namespace store {
+
+typedef std::map<uint64_t, qpid::broker::RecoverableExchange::shared_ptr>
+ ExchangeMap;
+typedef std::map<uint64_t, qpid::broker::RecoverableQueue::shared_ptr>
+ QueueMap;
+typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr>
+ MessageMap;
+// Msg Id -> vector of queue Ids where message is queued
+typedef std::map<uint64_t, std::vector<uint64_t> > MessageQueueMap;
+
+class MessageStorePlugin;
+
+/**
+ * @class StorageProvider
+ *
+ * StorageProvider defines the interface for the storage provider plugin to the
+ * Qpid broker persistence store plugin.
+ *
+ * @TODO Should StorageProvider also inherit from MessageStore? If so, then
+ * maybe remove Recoverable from MessageStore's inheritance and move it
+ * to MessageStorePlugin? In any event, somehow the discardInit() feature
+ * needs to get added here.
+ */
+class StorageProvider : public qpid::Plugin, public qpid::broker::MessageStore
+{
+public:
+
+ class Exception : public std::exception
+ {
+ public:
+ virtual ~Exception() throw() {}
+ virtual const char *what() const throw() = 0;
+ };
+
+ /**
+ * @name Methods inherited from qpid::Plugin
+ */
+ //@{
+ /**
+ * Return a pointer to the provider's options. The options will be
+ * updated during option parsing by the host program; therefore, the
+ * referenced Options object must remain valid past this function's return.
+ *
+ * @return An options group or 0 for no options. Default returns 0.
+ * Plugin retains ownership of return value.
+ */
+ virtual qpid::Options* getOptions() = 0;
+
+ /**
+ * Initialize Plugin functionality on a Target, called before
+ * initializing the target.
+ *
+ * StorageProviders should respond only to Targets of class
+ * qpid::store::MessageStorePlugin and ignore all others.
+ *
+ * When called, the provider should invoke the method
+ * qpid::store::MessageStorePlugin::providerAvailable() to alert the
+ * message store of StorageProvider's availability.
+ *
+ * Called before the target itself is initialized.
+ */
+ virtual void earlyInitialize (Plugin::Target& target) = 0;
+
+ /**
+ * Initialize StorageProvider functionality. Called after initializing
+ * the target.
+ *
+ * StorageProviders should respond only to Targets of class
+ * qpid::store::MessageStorePlugin and ignore all others.
+ *
+ * Called after the target is fully initialized.
+ */
+ virtual void initialize(Plugin::Target& target) = 0;
+ //@}
+
+ /**
+ * Receive notification that this provider is the one that will actively
+ * handle storage for the target. If the provider is to be used, this
+ * method will be called after earlyInitialize() and before any
+ * recovery operations (recovery, in turn, precedes call to initialize()).
+ * Thus, it is wise to not actually do any database ops from within
+ * earlyInitialize() - they can wait until activate() is called because
+ * at that point it is certain the database will be needed.
+ */
+ virtual void activate(MessageStorePlugin &store) = 0;
+
+ /**
+ * @name Methods inherited from qpid::broker::MessageStore
+ */
+ //@{
+ /**
+ * If called after init() but before recovery, will discard the database
+ * and reinitialize using an empty store dir. If @a pushDownStoreFiles
+ * is true, the content of the store dir will be moved to a backup dir
+ * inside the store dir. This is used when cluster nodes recover and must
+ * get thier content from a cluster sync rather than directly fromt the
+ * store.
+ *
+ * @param pushDownStoreFiles If true, will move content of the store dir
+ * into a subdir, leaving the store dir
+ * otherwise empty.
+ */
+ virtual void truncateInit(const bool pushDownStoreFiles = false) = 0;
+
+ /**
+ * Record the existence of a durable queue
+ */
+ virtual void create(PersistableQueue& queue,
+ const qpid::framing::FieldTable& args) = 0;
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(PersistableQueue& queue) = 0;
+
+ /**
+ * Record the existence of a durable exchange
+ */
+ virtual void create(const PersistableExchange& exchange,
+ const qpid::framing::FieldTable& args) = 0;
+ /**
+ * Destroy a durable exchange
+ */
+ virtual void destroy(const PersistableExchange& exchange) = 0;
+
+ /**
+ * Record a binding
+ */
+ virtual void bind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args) = 0;
+
+ /**
+ * Forget a binding
+ */
+ virtual void unbind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args) = 0;
+
+ /**
+ * Record generic durable configuration
+ */
+ virtual void create(const PersistableConfig& config) = 0;
+
+ /**
+ * Destroy generic durable configuration
+ */
+ virtual void destroy(const PersistableConfig& config) = 0;
+
+ /**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+ virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg) = 0;
+
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(PersistableMessage& msg) = 0;
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data) = 0;
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+ virtual void loadContent(const PersistableQueue& queue,
+ const boost::intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length) = 0;
+
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue) = 0;
+
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of the queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue) = 0;
+
+ /**
+ * Flushes all async messages to disk for the specified queue
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param queue the name of the queue from which it is to be dequeued
+ */
+ virtual void flush(const qpid::broker::PersistableQueue& queue) = 0;
+
+ /**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk
+ *
+ * @param queue the name of the queue to check for outstanding AIO
+ */
+ virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) = 0;
+ //@}
+
+ /**
+ * @TODO This should probably not be here - it's only here because
+ * MessageStore inherits from Recoverable... maybe move that derivation.
+ *
+ * As it is now, we don't use this. Separate recover methods are
+ * declared below for individual types, which also set up maps of
+ * messages, queues, transactions for the main store plugin to handle
+ * properly.
+ *
+ * Request recovery of queue and message state.
+ */
+ virtual void recover(qpid::broker::RecoveryManager& recoverer) {}
+
+ /**
+ * @name Methods that do the recovery of the various objects that
+ * were saved.
+ */
+ //@{
+
+ /**
+ * Recover bindings.
+ */
+ virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer) = 0;
+ virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+ ExchangeMap& exchangeMap) = 0;
+ virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
+ QueueMap& queueMap) = 0;
+ virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
+ const ExchangeMap& exchangeMap) = 0;
+ virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
+ MessageMap& messageMap,
+ MessageQueueMap& messageQueueMap) = 0;
+ //@}
+};
+
+}} // namespace qpid::store
+
+#endif /* QPID_STORE_STORAGEPROVIDER_H */
diff --git a/qpid/cpp/src/qpid/store/StoreException.h b/qpid/cpp/src/qpid/store/StoreException.h
new file mode 100644
index 0000000000..1dc7f670ec
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/StoreException.h
@@ -0,0 +1,49 @@
+#ifndef QPID_STORE_STOREEXCEPTION_H
+#define QPID_STORE_STOREEXCEPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <exception>
+#include <boost/format.hpp>
+#include "StorageProvider.h"
+
+namespace qpid {
+namespace store {
+
+class StoreException : public std::exception
+{
+ std::string text;
+public:
+ StoreException(const std::string& _text) : text(_text) {}
+ StoreException(const std::string& _text,
+ const StorageProvider::Exception& cause)
+ : text(_text + ": " + cause.what()) {}
+ virtual ~StoreException() throw() {}
+ virtual const char* what() const throw() { return text.c_str(); }
+};
+
+#define THROW_STORE_EXCEPTION(MESSAGE) throw qpid::store::StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ % __LINE__))
+#define THROW_STORE_EXCEPTION_2(MESSAGE, EXCEPTION) throw qpid::store::StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ % __LINE__), EXCEPTION)
+
+}} // namespace qpid::store
+
+#endif /* QPID_STORE_STOREEXCEPTION_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp
new file mode 100644
index 0000000000..0ecfacfb4b
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AmqpTransaction.h"
+#include "DatabaseConnection.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+AmqpTransaction::AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db)
+ : db(_db), transDepth(0)
+{
+}
+
+AmqpTransaction::~AmqpTransaction()
+{
+ if (transDepth > 0)
+ this->abort();
+}
+
+void
+AmqpTransaction::begin()
+{
+ _bstr_t beginCmd("BEGIN TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(beginCmd, NULL, adExecuteNoRecords);
+ ++transDepth;
+}
+
+void
+AmqpTransaction::commit()
+{
+ if (transDepth > 0) {
+ _bstr_t commitCmd("COMMIT TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(commitCmd, NULL, adExecuteNoRecords);
+ --transDepth;
+ }
+}
+
+void
+AmqpTransaction::abort()
+{
+ if (transDepth > 0) {
+ _bstr_t rollbackCmd("ROLLBACK TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(rollbackCmd, NULL, adExecuteNoRecords);
+ transDepth = 0;
+ }
+}
+
+AmqpTPCTransaction::AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db,
+ const std::string& _xid)
+ : AmqpTransaction(_db), xid(_xid)
+{
+}
+
+AmqpTPCTransaction::~AmqpTPCTransaction()
+{
+}
+
+void
+AmqpTPCTransaction::prepare()
+{
+ // Intermediate transactions should have already assured integrity of
+ // the content in the database; just waiting to pull the trigger on the
+ // outermost transaction.
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h
new file mode 100644
index 0000000000..9b87d0ae15
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h
@@ -0,0 +1,84 @@
+#ifndef QPID_STORE_MSSQL_AMQPTRANSACTION_H
+#define QPID_STORE_MSSQL_AMQPTRANSACTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/TransactionalStore.h>
+#include <string>
+#include <memory>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @class AmqpTransaction
+ *
+ * Class representing an AMQP transaction. This is used around a set of
+ * enqueue and dequeue operations that occur when the broker is acting
+ * on a transaction commit/abort from the client.
+ */
+class AmqpTransaction : public qpid::broker::TransactionContext {
+
+ std::auto_ptr<DatabaseConnection> db;
+
+ // Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+ // et al, nested transactions are carried out with direct SQL commands.
+ // To ensure the state of this is known, keep track of how deeply the
+ // transactions are nested.
+ unsigned int transDepth;
+
+public:
+ AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db);
+ virtual ~AmqpTransaction();
+
+ DatabaseConnection *dbConn() { return db.get(); }
+
+ void begin();
+ void commit();
+ void abort();
+};
+
+/**
+ * @class AmqpTPCTransaction
+ *
+ * Class representing a Two-Phase-Commit (TPC) AMQP transaction. This is
+ * used around a set of enqueue and dequeue operations that occur when the
+ * broker is acting on a transaction prepare/commit/abort from the client.
+ */
+class AmqpTPCTransaction : public AmqpTransaction,
+ public qpid::broker::TPCTransactionContext {
+ std::string xid;
+
+public:
+ AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db,
+ const std::string& _xid);
+ virtual ~AmqpTPCTransaction();
+
+ void prepare();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_AMQPTRANSACTION_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
new file mode 100644
index 0000000000..737d93b142
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "BindingRecordset.h"
+#include "BlobAdapter.h"
+#include "BlobEncoder.h"
+#include "VariantHelper.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+BindingRecordset::add(uint64_t exchangeId,
+ const std::string& queueName,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable& args)
+{
+ VariantHelper<std::string> queueNameStr(queueName);
+ VariantHelper<std::string> routingKeyStr(routingKey);
+ BlobEncoder blob (args); // Marshall field table to a blob
+ rs->AddNew();
+ rs->Fields->GetItem("exchangeId")->Value = exchangeId;
+ rs->Fields->GetItem("queueName")->Value = queueNameStr;
+ rs->Fields->GetItem("routingKey")->Value = routingKeyStr;
+ rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+ rs->Update();
+}
+
+void
+BindingRecordset::remove(uint64_t exchangeId,
+ const std::string& queueName,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable& /*args*/)
+{
+ // Look up the affected binding.
+ std::ostringstream filter;
+ filter << "exchangeId = " << exchangeId
+ << " AND queueName = '" << queueName << "'"
+ << " AND routingKey = '" << routingKey << "'" << std::ends;
+ rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ if (rs->RecordCount != 0) {
+ // Delete the records
+ rs->Delete(adAffectGroup);
+ rs->Update();
+ }
+ requery();
+}
+
+void
+BindingRecordset::remove(uint64_t exchangeId)
+{
+ // Look up the affected bindings by the exchange ID
+ std::ostringstream filter;
+ filter << "exchangeId = " << exchangeId << std::ends;
+ rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ if (rs->RecordCount != 0) {
+ // Delete the records
+ rs->Delete(adAffectGroup);
+ rs->Update();
+ }
+ requery();
+}
+
+void
+BindingRecordset::remove(const std::string& queueName)
+{
+ // Look up the affected bindings by the exchange ID
+ std::ostringstream filter;
+ filter << "queueName = '" << queueName << "'" << std::ends;
+ rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ if (rs->RecordCount != 0) {
+ // Delete the records
+ rs->Delete(adAffectGroup);
+ rs->Update();
+ }
+ requery();
+}
+
+void
+BindingRecordset::recover(qpid::broker::RecoveryManager& recoverer,
+ std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap)
+{
+ if (rs->BOF && rs->EndOfFile)
+ return; // Nothing to do
+ rs->MoveFirst();
+ Binding b;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&b);
+ while (!rs->EndOfFile) {
+ long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ broker::RecoverableExchange::shared_ptr exch = exchMap[b.exchangeId];
+ std::string q(b.queueName), k(b.routingKey);
+ exch->bind(q, k, blob);
+ rs->MoveNext();
+ }
+
+ piAdoRecordBinding->Release();
+}
+
+void
+BindingRecordset::dump()
+{
+ Recordset::dump();
+ if (rs->EndOfFile && rs->BOF) // No records
+ return;
+ rs->MoveFirst();
+
+ Binding b;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&b);
+
+ while (VARIANT_FALSE == rs->EndOfFile) {
+ QPID_LOG(notice, "exch " << b.exchangeId
+ << ", q: " << b.queueName
+ << ", k: " << b.routingKey);
+ rs->MoveNext();
+ }
+
+ piAdoRecordBinding->Release();
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h
new file mode 100644
index 0000000000..5c51636f4f
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h
@@ -0,0 +1,84 @@
+#ifndef QPID_STORE_MSSQL_BINDINGRECORDSET_H
+#define QPID_STORE_MSSQL_BINDINGRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <icrsint.h>
+#include "Recordset.h"
+#include <qpid/broker/RecoveryManager.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class BindingRecordset
+ *
+ * Class for the binding records.
+ */
+class BindingRecordset : public Recordset {
+
+ class Binding : public CADORecordBinding {
+ BEGIN_ADO_BINDING(Binding)
+ ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, exchangeId, FALSE)
+ ADO_VARIABLE_LENGTH_ENTRY4(2, adVarChar, queueName,
+ sizeof(queueName), FALSE)
+ ADO_VARIABLE_LENGTH_ENTRY4(3, adVarChar, routingKey,
+ sizeof(routingKey), FALSE)
+ END_ADO_BINDING()
+
+ public:
+ uint64_t exchangeId;
+ char queueName[256];
+ char routingKey[256];
+ };
+
+public:
+ // Add a new binding
+ void add(uint64_t exchangeId,
+ const std::string& queueName,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable& args);
+
+ // Remove a specific binding
+ void remove(uint64_t exchangeId,
+ const std::string& queueName,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable& args);
+
+ // Remove all bindings for the specified exchange
+ void remove(uint64_t exchangeId);
+
+ // Remove all bindings for the specified queue
+ void remove(const std::string& queueName);
+
+ // Recover bindings set using exchMap to get from Id to RecoverableExchange.
+ void recover(qpid::broker::RecoveryManager& recoverer,
+ std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap);
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_BINDINGRECORDSET_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp
new file mode 100644
index 0000000000..1889f34e41
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BlobAdapter.h"
+#include <qpid/Exception.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+BlobAdapter::extractBuff()
+{
+ // To give a valid Buffer back, lock the safearray, obtaining a pointer to
+ // the actual data. Record the pointer in the Buffer so the destructor
+ // knows to unlock the safearray.
+ if (buff.getPointer() == 0) {
+ char *blob;
+ SafeArrayAccessData(this->parray, (void **)&blob);
+ qpid::framing::Buffer lockedBuff(blob, buff.getSize());
+ buff = lockedBuff;
+ }
+}
+
+
+BlobAdapter::~BlobAdapter()
+{
+ // If buff's pointer is set, the safearray is locked, so unlock it
+ if (buff.getPointer() != 0)
+ SafeArrayUnaccessData(this->parray);
+}
+
+BlobAdapter::operator qpid::framing::Buffer& ()
+{
+ extractBuff();
+ return buff;
+}
+
+BlobAdapter::operator qpid::framing::FieldTable& ()
+{
+ extractBuff();
+ fields.decode(buff);
+ return fields;
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h
new file mode 100644
index 0000000000..1c666392bc
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h
@@ -0,0 +1,62 @@
+#ifndef QPID_STORE_MSSQL_BLOBADAPTER_H
+#define QPID_STORE_MSSQL_BLOBADAPTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <comutil.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/FieldTable.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class BlobAdapter
+ *
+ * Adapter for accessing a blob (varbinary SQL field) as a qpid::framing::Buffer
+ * in an exception-safe way.
+ */
+class BlobAdapter : public _variant_t {
+private:
+ // This Buffer's pointer indicates whether or not a safearray has
+ // been locked; if it's 0, no locking was done.
+ qpid::framing::Buffer buff;
+ qpid::framing::FieldTable fields;
+
+ void extractBuff();
+
+public:
+ // Initialize with the known length of the data that will come.
+ // Assigning a _variant_t to this object will set up the array to be
+ // accessed with the operator Buffer&()
+ BlobAdapter(long blobSize) : _variant_t(), buff(0, blobSize) {}
+ ~BlobAdapter();
+ BlobAdapter& operator=(_variant_t& var_t_Src)
+ { _variant_t::operator=(var_t_Src); return *this; }
+ operator qpid::framing::Buffer& ();
+ operator qpid::framing::FieldTable& ();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_BLOBADAPTER_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp
new file mode 100644
index 0000000000..75d3dc2d86
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BlobEncoder.h"
+#include <qpid/Exception.h>
+#include <qpid/broker/Persistable.h>
+#include <qpid/broker/PersistableMessage.h>
+#include <boost/intrusive_ptr.hpp>
+#include <memory.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+template <class ITEM> void
+BlobEncoder::encode(const ITEM &item)
+{
+ SAFEARRAYBOUND bound[1] = {0, 0};
+ bound[0].cElements = item.encodedSize();
+ blob = SafeArrayCreate(VT_UI1, 1, bound);
+ if (S_OK != SafeArrayLock(blob)) {
+ SafeArrayDestroy(blob);
+ blob = 0;
+ throw qpid::Exception("Error locking blob area for persistable item");
+ }
+ try {
+ qpid::framing::Buffer buff((char *)blob->pvData, bound[0].cElements);
+ item.encode(buff);
+ }
+ catch(...) {
+ SafeArrayUnlock(blob);
+ SafeArrayDestroy(blob);
+ blob = 0;
+ throw;
+ }
+ this->vt = VT_ARRAY | VT_UI1;
+ this->parray = blob;
+ SafeArrayUnlock(blob);
+}
+
+template <> void
+BlobEncoder::encode(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &item)
+{
+ // NOTE! If this code changes, verify the recovery code in MessageRecordset
+ SAFEARRAYBOUND bound[1] = {0, 0};
+ bound[0].cElements = item->encodedSize() + sizeof(uint32_t);
+ blob = SafeArrayCreate(VT_UI1, 1, bound);
+ if (S_OK != SafeArrayLock(blob)) {
+ SafeArrayDestroy(blob);
+ blob = 0;
+ throw qpid::Exception("Error locking blob area for message");
+ }
+ try {
+ uint32_t headerSize = item->encodedHeaderSize();
+ qpid::framing::Buffer buff((char *)blob->pvData, bound[0].cElements);
+ buff.putLong(headerSize);
+ item->encode(buff);
+ }
+ catch(...) {
+ SafeArrayUnlock(blob);
+ SafeArrayDestroy(blob);
+ blob = 0;
+ throw;
+ }
+ this->vt = VT_ARRAY | VT_UI1;
+ this->parray = blob;
+ SafeArrayUnlock(blob);
+}
+
+template <> void
+BlobEncoder::encode(const std::string &item)
+{
+ SAFEARRAYBOUND bound[1] = {0, 0};
+ bound[0].cElements = item.size();
+ blob = SafeArrayCreate(VT_UI1, 1, bound);
+ if (S_OK != SafeArrayLock(blob)) {
+ SafeArrayDestroy(blob);
+ blob = 0;
+ throw qpid::Exception("Error locking blob area for string");
+ }
+ memcpy_s(blob->pvData, item.size(), item.data(), item.size());
+ this->vt = VT_ARRAY | VT_UI1;
+ this->parray = blob;
+ SafeArrayUnlock(blob);
+}
+
+BlobEncoder::BlobEncoder(const qpid::broker::Persistable &item) : blob(0)
+{
+ encode(item);
+}
+
+BlobEncoder::BlobEncoder(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &msg) : blob(0)
+{
+ encode(msg);
+}
+
+BlobEncoder::BlobEncoder(const qpid::framing::FieldTable &fields) : blob(0)
+{
+ encode(fields);
+}
+
+BlobEncoder::BlobEncoder(const std::string &data) : blob(0)
+{
+ encode(data);
+}
+
+BlobEncoder::~BlobEncoder()
+{
+ if (blob)
+ SafeArrayDestroy(blob);
+ blob = 0;
+ this->parray = 0;
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h
new file mode 100644
index 0000000000..d2b56223c1
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h
@@ -0,0 +1,61 @@
+#ifndef QPID_STORE_MSSQL_BLOBENCODER_H
+#define QPID_STORE_MSSQL_BLOBENCODER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <comutil.h>
+#include <string>
+#include <boost/intrusive_ptr.hpp>
+#include <qpid/broker/Persistable.h>
+#include <qpid/broker/PersistableMessage.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/FieldTable.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class BlobEncoder
+ *
+ * Encodes a blob (varbinary) field from a qpid::broker::Persistable or a
+ * qpid::framing::FieldTable (both of which can be encoded to
+ * qpid::framing::Buffer) so it can be passed to ADO methods for writing
+ * to the database.
+ */
+class BlobEncoder : public _variant_t {
+private:
+ SAFEARRAY *blob;
+
+ template <class ITEM> void encode(const ITEM &item);
+
+public:
+ BlobEncoder(const qpid::broker::Persistable &item);
+ BlobEncoder(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &msg);
+ BlobEncoder(const qpid::framing::FieldTable &fields);
+ BlobEncoder(const std::string& data);
+ ~BlobEncoder();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_BLOBENCODER_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp
new file mode 100644
index 0000000000..977eb6d5d2
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "BlobRecordset.h"
+#include "BlobEncoder.h"
+#include "VariantHelper.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+BlobRecordset::remove(uint64_t id)
+{
+ // Look up the item by its persistenceId
+ std::ostringstream filter;
+ filter << "persistenceId = " << id << std::ends;
+ rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ if (!rs->EndOfFile) {
+ // Delete the record
+ rs->Delete(adAffectCurrent);
+ rs->Update();
+ }
+}
+
+void
+BlobRecordset::add(const qpid::broker::Persistable& item)
+{
+ BlobEncoder blob (item); // Marshall item info to a blob
+ rs->AddNew();
+ item.setPersistenceId(rs->Fields->Item["persistenceId"]->Value);
+ rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+ rs->Update();
+}
+
+void
+BlobRecordset::remove(const qpid::broker::Persistable& item)
+{
+ remove(item.getPersistenceId());
+}
+
+void
+BlobRecordset::dump()
+{
+ Recordset::dump();
+#if 1
+ if (rs->EndOfFile && rs->BOF) // No records
+ return;
+
+ rs->MoveFirst();
+ while (!rs->EndOfFile) {
+ uint64_t id = rs->Fields->Item["persistenceId"]->Value;
+ QPID_LOG(notice, " -> " << id);
+ rs->MoveNext();
+ }
+#else
+ for (Iterator iter = begin(); iter != end(); ++iter) {
+ uint64_t id = *iter.first;
+ QPID_LOG(notice, " -> " << id);
+ }
+#endif
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h
new file mode 100644
index 0000000000..4e89254b48
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h
@@ -0,0 +1,53 @@
+#ifndef QPID_STORE_MSSQL_BLOBRECORDSET_H
+#define QPID_STORE_MSSQL_BLOBRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Recordset.h"
+#include <qpid/broker/Persistable.h>
+#include <string>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class BlobRecordset
+ *
+ * Class for the "blob" records that record an id, varbinary(max) pair.
+ */
+class BlobRecordset : public Recordset {
+protected:
+ // Remove a record given its Id.
+ void remove(uint64_t id);
+
+public:
+ void add(const qpid::broker::Persistable& item);
+ void remove(const qpid::broker::Persistable& item);
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_BLOBRECORDSET_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp
new file mode 100644
index 0000000000..34edec8acd
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include <comdef.h>
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+DatabaseConnection::DatabaseConnection() : conn(0)
+{
+}
+
+DatabaseConnection::~DatabaseConnection()
+{
+ close();
+}
+
+void
+DatabaseConnection::open(const std::string& connectString,
+ const std::string& dbName)
+{
+ if (conn && conn->State == adStateOpen)
+ return;
+ std::string adoConnect = "Provider=SQLOLEDB;" + connectString;
+ try {
+ TESTHR(conn.CreateInstance(__uuidof(Connection)));
+ conn->ConnectionString = adoConnect.c_str();
+ conn->Open("", "", "", adConnectUnspecified);
+ if (dbName.length() > 0)
+ conn->DefaultDatabase = dbName.c_str();
+ }
+ catch(_com_error &e) {
+ close();
+ throw ADOException("MSSQL can't open " + dbName + " at " + adoConnect, e);
+ }
+}
+
+void
+DatabaseConnection::close()
+{
+ if (conn && conn->State == adStateOpen)
+ conn->Close();
+ conn = 0;
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h
new file mode 100644
index 0000000000..2b8bbffa90
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h
@@ -0,0 +1,62 @@
+#ifndef QPID_STORE_MSSQL_DATABASECONNECTION_H
+#define QPID_STORE_MSSQL_DATABASECONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
+#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
+ no_namespace rename("EOF", "EndOfFile")
+
+#include <string>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class DatabaseConnection
+ *
+ * Represents a connection to the SQL database. This class wraps the
+ * needed _ConnectionPtr for ADO as well as the needed COM initialization
+ * and cleanup that each thread requires. It is expected that this class
+ * will be maintained in thread-specific storage so it has no locks.
+ */
+class DatabaseConnection {
+protected:
+ _ConnectionPtr conn;
+
+public:
+ DatabaseConnection();
+ ~DatabaseConnection();
+ void open(const std::string& connectString,
+ const std::string& dbName = "");
+ void close();
+ operator _ConnectionPtr () { return conn; }
+
+ void beginTransaction() { conn->BeginTrans(); }
+ void commitTransaction() {conn->CommitTrans(); }
+ void rollbackTransaction() { conn->RollbackTrans(); }
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_DATABASECONNECTION_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/Exception.h b/qpid/cpp/src/qpid/store/ms-sql/Exception.h
new file mode 100644
index 0000000000..34e401b068
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/Exception.h
@@ -0,0 +1,56 @@
+#ifndef QPID_STORE_MSSQL_EXCEPTION_H
+#define QPID_STORE_MSSQL_EXCEPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <comdef.h>
+#include <qpid/store/StorageProvider.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class Exception : public qpid::store::StorageProvider::Exception
+{
+protected:
+ std::string text;
+public:
+ Exception(const std::string& _text) : text(_text) {}
+ virtual ~Exception() {}
+ virtual const char* what() const throw() { return text.c_str(); }
+};
+
+class ADOException : public Exception
+{
+public:
+ ADOException(const std::string& _text, _com_error &e)
+ : Exception(_text) {
+ text += ": ";
+ _bstr_t wmsg = e.Description();
+ text += (const char *)wmsg;
+ }
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_EXCEPTION_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
new file mode 100644
index 0000000000..5872dbc2e4
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
@@ -0,0 +1,989 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdlib.h>
+#include <string>
+#include <windows.h>
+#include <qpid/broker/RecoverableQueue.h>
+#include <qpid/log/Statement.h>
+#include <qpid/store/MessageStorePlugin.h>
+#include <qpid/store/StorageProvider.h>
+#include "AmqpTransaction.h"
+#include "BlobAdapter.h"
+#include "BlobRecordset.h"
+#include "BindingRecordset.h"
+#include "MessageMapRecordset.h"
+#include "MessageRecordset.h"
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include "State.h"
+#include "VariantHelper.h"
+
+// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
+#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
+ no_namespace rename("EOF", "EndOfFile")
+#include <comdef.h>
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+
+// Table names
+const std::string TblBinding("tblBinding");
+const std::string TblConfig("tblConfig");
+const std::string TblExchange("tblExchange");
+const std::string TblMessage("tblMessage");
+const std::string TblMessageMap("tblMessageMap");
+const std::string TblQueue("tblQueue");
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class MSSqlProvider
+ *
+ * Implements a qpid::store::StorageProvider that uses Microsoft SQL Server as
+ * the backend data store for Qpid.
+ */
+class MSSqlProvider : public qpid::store::StorageProvider
+{
+protected:
+ void finalizeMe();
+
+ void dump();
+
+public:
+ MSSqlProvider();
+ ~MSSqlProvider();
+
+ virtual qpid::Options* getOptions() { return &options; }
+
+ virtual void earlyInitialize (Plugin::Target& target);
+ virtual void initialize(Plugin::Target& target);
+
+ /**
+ * Receive notification that this provider is the one that will actively
+ * handle provider storage for the target. If the provider is to be used,
+ * this method will be called after earlyInitialize() and before any
+ * recovery operations (recovery, in turn, precedes call to initialize()).
+ */
+ virtual void activate(MessageStorePlugin &store);
+
+ /**
+ * @name Methods inherited from qpid::broker::MessageStore
+ */
+ //@{
+ /**
+ * If called after init() but before recovery, will discard the database
+ * and reinitialize using an empty store dir. If @a pushDownStoreFiles
+ * is true, the content of the store dir will be moved to a backup dir
+ * inside the store dir. This is used when cluster nodes recover and must
+ * get thier content from a cluster sync rather than directly fromt the
+ * store.
+ *
+ * @param pushDownStoreFiles If true, will move content of the store dir
+ * into a subdir, leaving the store dir
+ * otherwise empty.
+ */
+ virtual void truncateInit(const bool pushDownStoreFiles = false);
+
+ /**
+ * Record the existence of a durable queue
+ */
+ virtual void create(PersistableQueue& queue,
+ const qpid::framing::FieldTable& args);
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(PersistableQueue& queue);
+
+ /**
+ * Record the existence of a durable exchange
+ */
+ virtual void create(const PersistableExchange& exchange,
+ const qpid::framing::FieldTable& args);
+ /**
+ * Destroy a durable exchange
+ */
+ virtual void destroy(const PersistableExchange& exchange);
+
+ /**
+ * Record a binding
+ */
+ virtual void bind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+
+ /**
+ * Forget a binding
+ */
+ virtual void unbind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+
+ /**
+ * Record generic durable configuration
+ */
+ virtual void create(const PersistableConfig& config);
+
+ /**
+ * Destroy generic durable configuration
+ */
+ virtual void destroy(const PersistableConfig& config);
+
+ /**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+ virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg);
+
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(PersistableMessage& msg);
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data);
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+ virtual void loadContent(const qpid::broker::PersistableQueue& queue,
+ const boost::intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
+
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of the queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
+
+ /**
+ * Flushes all async messages to disk for the specified queue
+ *
+ * Note: this is a no-op for this provider.
+ *
+ * @param queue the name of the queue from which it is to be dequeued
+ */
+ virtual void flush(const PersistableQueue& queue) {};
+
+ /**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk
+ *
+ * @param queue the name of the queue to check for outstanding AIO
+ */
+ virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue)
+ {return 0;}
+ //@}
+
+ /**
+ * @name Methods inherited from qpid::broker::TransactionalStore
+ */
+ //@{
+ virtual std::auto_ptr<qpid::broker::TransactionContext> begin();
+ virtual std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+ virtual void prepare(qpid::broker::TPCTransactionContext& txn);
+ virtual void commit(qpid::broker::TransactionContext& txn);
+ virtual void abort(qpid::broker::TransactionContext& txn);
+
+ // @TODO This maybe should not be in TransactionalStore
+ virtual void collectPreparedXids(std::set<std::string>& xids) {}
+ //@}
+
+ virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer);
+ virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+ ExchangeMap& exchangeMap);
+ virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
+ QueueMap& queueMap);
+ virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
+ const ExchangeMap& exchangeMap);
+ virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
+ MessageMap& messageMap,
+ MessageQueueMap& messageQueueMap);
+
+private:
+ struct ProviderOptions : public qpid::Options
+ {
+ std::string connectString;
+ std::string catalogName;
+
+ ProviderOptions(const std::string &name)
+ : qpid::Options(name),
+ catalogName("QpidStore")
+ {
+ const enum { NAMELEN = MAX_COMPUTERNAME_LENGTH + 1 };
+ TCHAR myName[NAMELEN];
+ DWORD myNameLen = NAMELEN;
+ GetComputerName(myName, &myNameLen);
+ connectString = "Data Source=";
+ connectString += myName;
+ connectString += "\\SQLEXPRESS;Integrated Security=SSPI";
+ addOptions()
+ ("connect",
+ qpid::optValue(connectString, "STRING"),
+ "Connection string for the database to use. Will prepend "
+ "Provider=SQLOLEDB;")
+ ("catalog",
+ qpid::optValue(catalogName, "DB NAME"),
+ "Catalog (database) name")
+ ;
+ }
+ };
+ ProviderOptions options;
+
+ // Each thread has a separate connection to the database and also needs
+ // to manage its COM initialize/finalize individually. This is done by
+ // keeping a thread-specific State.
+ boost::thread_specific_ptr<State> dbState;
+
+ State *initState();
+ DatabaseConnection *initConnection(void);
+ void createDb(_ConnectionPtr conn, const std::string &name);
+};
+
+static MSSqlProvider static_instance_registers_plugin;
+
+void
+MSSqlProvider::finalizeMe()
+{
+ dbState.reset();
+}
+
+MSSqlProvider::MSSqlProvider()
+ : options("MS SQL Provider options")
+{
+}
+
+MSSqlProvider::~MSSqlProvider()
+{
+}
+
+void
+MSSqlProvider::earlyInitialize(Plugin::Target &target)
+{
+ MessageStorePlugin *store = dynamic_cast<MessageStorePlugin *>(&target);
+ if (store) {
+ // If the database init fails, report it and don't register; give
+ // the rest of the broker a chance to run.
+ //
+ // Don't try to initConnection() since that will fail if the
+ // database doesn't exist. Instead, try to open a connection without
+ // a database name, then search for the database. There's still a
+ // chance this provider won't be selected for the store too, so be
+ // be sure to close the database connection before return to avoid
+ // leaving a connection up that will not be used.
+ try {
+ std::auto_ptr<DatabaseConnection> db(new DatabaseConnection());
+ db->open(options.connectString, "");
+ _ConnectionPtr conn(*db);
+ _RecordsetPtr pCatalogs = NULL;
+ VariantHelper<std::string> catalogName(options.catalogName);
+ pCatalogs = conn->OpenSchema(adSchemaCatalogs, catalogName);
+ if (pCatalogs->EndOfFile) {
+ // Database doesn't exist; create it
+ QPID_LOG(notice,
+ "MSSQL: Creating database " + options.catalogName);
+ createDb(conn, options.catalogName);
+ }
+ else {
+ QPID_LOG(notice,
+ "MSSQL: Database located: " + options.catalogName);
+ }
+ if (pCatalogs) {
+ if (pCatalogs->State == adStateOpen)
+ pCatalogs->Close();
+ pCatalogs = 0;
+ }
+ db->close();
+ store->providerAvailable("MSSQL", this);
+ }
+ catch (qpid::Exception &e) {
+ QPID_LOG(error, e.what());
+ return;
+ }
+ store->addFinalizer(boost::bind(&MSSqlProvider::finalizeMe, this));
+ }
+}
+
+void
+MSSqlProvider::initialize(Plugin::Target& target)
+{
+}
+
+void
+MSSqlProvider::activate(MessageStorePlugin &store)
+{
+ QPID_LOG(info, "MS SQL Provider is up");
+}
+
+void
+MSSqlProvider::truncateInit(const bool pushDownStoreFiles)
+{
+}
+
+void
+MSSqlProvider::create(PersistableQueue& queue,
+ const qpid::framing::FieldTable& /*args needed for jrnl*/)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ BlobRecordset rsQueues;
+ db->beginTransaction();
+ rsQueues.open(db, TblQueue);
+ rsQueues.add(queue);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ db->rollbackTransaction();
+ throw ADOException("Error creating queue " + queue.getName(), e);
+ }
+}
+
+/**
+ * Destroy a durable queue
+ */
+void
+MSSqlProvider::destroy(PersistableQueue& queue)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ BlobRecordset rsQueues;
+ BindingRecordset rsBindings;
+ db->beginTransaction();
+ rsQueues.open(db, TblQueue);
+ rsBindings.open(db, TblBinding);
+ rsQueues.remove(queue);
+ rsBindings.remove(queue.getName());
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ db->rollbackTransaction();
+ throw ADOException("Error deleting queue " + queue.getName(), e);
+ }
+}
+
+/**
+ * Record the existence of a durable exchange
+ */
+void
+MSSqlProvider::create(const PersistableExchange& exchange,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ BlobRecordset rsExchanges;
+ db->beginTransaction();
+ rsExchanges.open(db, TblExchange);
+ rsExchanges.add(exchange);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ db->rollbackTransaction();
+ throw ADOException("Error creating exchange " + exchange.getName(), e);
+ }
+}
+
+/**
+ * Destroy a durable exchange
+ */
+void
+MSSqlProvider::destroy(const PersistableExchange& exchange)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ BlobRecordset rsExchanges;
+ BindingRecordset rsBindings;
+ db->beginTransaction();
+ rsExchanges.open(db, TblExchange);
+ rsBindings.open(db, TblBinding);
+ rsExchanges.remove(exchange);
+ rsBindings.remove(exchange.getPersistenceId());
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ db->rollbackTransaction();
+ throw ADOException("Error deleting exchange " + exchange.getName(), e);
+ }
+}
+
+/**
+ * Record a binding
+ */
+void
+MSSqlProvider::bind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ BindingRecordset rsBindings;
+ db->beginTransaction();
+ rsBindings.open(db, TblBinding);
+ rsBindings.add(exchange.getPersistenceId(), queue.getName(), key, args);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ db->rollbackTransaction();
+ throw ADOException("Error binding exchange " + exchange.getName() +
+ " to queue " + queue.getName(), e);
+ }
+}
+
+/**
+ * Forget a binding
+ */
+void
+MSSqlProvider::unbind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ BindingRecordset rsBindings;
+ db->beginTransaction();
+ rsBindings.open(db, TblBinding);
+ rsBindings.remove(exchange.getPersistenceId(),
+ queue.getName(),
+ key,
+ args);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ db->rollbackTransaction();
+ throw ADOException("Error unbinding exchange " + exchange.getName() +
+ " from queue " + queue.getName(), e);
+ }
+}
+
+/**
+ * Record generic durable configuration
+ */
+void
+MSSqlProvider::create(const PersistableConfig& config)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ BlobRecordset rsConfigs;
+ db->beginTransaction();
+ rsConfigs.open(db, TblConfig);
+ rsConfigs.add(config);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ db->rollbackTransaction();
+ throw ADOException("Error creating config " + config.getName(), e);
+ }
+}
+
+/**
+ * Destroy generic durable configuration
+ */
+void
+MSSqlProvider::destroy(const PersistableConfig& config)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ BlobRecordset rsConfigs;
+ db->beginTransaction();
+ rsConfigs.open(db, TblConfig);
+ rsConfigs.remove(config);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ db->rollbackTransaction();
+ throw ADOException("Error deleting config " + config.getName(), e);
+ }
+}
+
+/**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+void
+MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ MessageRecordset rsMessages;
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.add(msg);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ db->rollbackTransaction();
+ throw ADOException("Error staging message", e);
+ }
+}
+
+/**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+void
+MSSqlProvider::destroy(PersistableMessage& msg)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ BlobRecordset rsMessages;
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.remove(msg);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ db->rollbackTransaction();
+ throw ADOException("Error deleting message", e);
+ }
+}
+
+/**
+ * Appends content to a previously staged message
+ */
+void
+MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ MessageRecordset rsMessages;
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.append(msg, data);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ db->rollbackTransaction();
+ throw ADOException("Error appending to message", e);
+ }
+}
+
+/**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+void
+MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
+ const boost::intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+ // SQL store keeps all messages in one table, so we don't need the
+ // queue reference.
+ DatabaseConnection *db = initConnection();
+ try {
+ MessageRecordset rsMessages;
+ rsMessages.open(db, TblMessage);
+ rsMessages.loadContent(msg, data, offset, length);
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error loading message content", e);
+ }
+}
+
+/**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * @param ctxt The transaction context under which this enqueue happens.
+ * @param msg The message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ */
+void
+MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
+{
+ AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
+ if (atxn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ (void)initState(); // Ensure this thread is initialized
+ try {
+ atxn->begin();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error queuing message", e);
+ }
+
+ try {
+ if (msg->getPersistenceId() == 0) { // Message itself not yet saved
+ MessageRecordset rsMessages;
+ rsMessages.open(atxn->dbConn(), TblMessage);
+ rsMessages.add(msg);
+ }
+ MessageMapRecordset rsMap;
+ rsMap.open(atxn->dbConn(), TblMessageMap);
+ rsMap.add(msg->getPersistenceId(), queue.getPersistenceId());
+ atxn->commit();
+ }
+ catch(_com_error &e) {
+ atxn->abort();
+ throw ADOException("Error queuing message", e);
+ }
+}
+
+/**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * @param ctxt The transaction context under which this dequeue happens.
+ * @param msg The message to dequeue
+ * @param queue The queue from which it is to be dequeued
+ */
+void
+MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
+{
+ AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
+ if (atxn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ (void)initState(); // Ensure this thread is initialized
+ try {
+ atxn->begin();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error queuing message", e);
+ }
+ try {
+ MessageMapRecordset rsMap;
+ rsMap.open(atxn->dbConn(), TblMessageMap);
+ bool more = rsMap.remove(msg->getPersistenceId(),
+ queue.getPersistenceId());
+ if (!more) {
+ MessageRecordset rsMessages;
+ rsMessages.open(atxn->dbConn(), TblMessage);
+ rsMessages.remove(msg);
+ }
+ atxn->commit();
+ }
+ catch(_com_error &e) {
+ atxn->abort();
+ throw ADOException("Error dequeuing message", e);
+ }
+}
+
+std::auto_ptr<qpid::broker::TransactionContext>
+MSSqlProvider::begin()
+{
+ (void)initState(); // Ensure this thread is initialized
+
+ // Transactions are associated with the Connection, so this transaction
+ // context needs its own connection. At the time of writing, single-phase
+ // transactions are dealt with completely on one thread, so we really
+ // could just use the thread-specific DatabaseConnection for this.
+ // However, that would introduce an ugly, hidden coupling, so play
+ // it safe and handle this just like a TPC transaction, which actually
+ // can be prepared and committed/aborted from different threads,
+ // making it a bad idea to try using the thread-local DatabaseConnection.
+ std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+ db->open(options.connectString, options.catalogName);
+ std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db));
+ tx->begin();
+ std::auto_ptr<qpid::broker::TransactionContext> tc(tx);
+ return tc;
+}
+
+std::auto_ptr<qpid::broker::TPCTransactionContext>
+MSSqlProvider::begin(const std::string& xid)
+{
+ (void)initState(); // Ensure this thread is initialized
+ std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+ db->open(options.connectString, options.catalogName);
+ std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid));
+ tx->begin();
+ std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
+ return tc;
+}
+
+void
+MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn)
+{
+}
+
+void
+MSSqlProvider::commit(qpid::broker::TransactionContext& txn)
+{
+ (void)initState(); // Ensure this thread is initialized
+ AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn);
+ if (atxn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ atxn->commit();
+}
+
+void
+MSSqlProvider::abort(qpid::broker::TransactionContext& txn)
+{
+ (void)initState(); // Ensure this thread is initialized
+ AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn);
+ if (atxn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ atxn->abort();
+}
+
+// @TODO Much of this recovery code is way too similar... refactor to
+// a recover template method on BlobRecordset.
+
+void
+MSSqlProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
+ rsConfigs.open(db, TblConfig);
+ _RecordsetPtr p = (_RecordsetPtr)rsConfigs;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Config instance and reset its ID.
+ broker::RecoverableConfig::shared_ptr config =
+ recoverer.recoverConfig(blob);
+ config->setPersistenceId(id);
+ p->MoveNext();
+ }
+}
+
+void
+MSSqlProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+ ExchangeMap& exchangeMap)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ rsExchanges.open(db, TblExchange);
+ _RecordsetPtr p = (_RecordsetPtr)rsExchanges;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Exchange instance, reset its ID, and remember the
+ // ones restored for matching up when recovering bindings.
+ broker::RecoverableExchange::shared_ptr exchange =
+ recoverer.recoverExchange(blob);
+ exchange->setPersistenceId(id);
+ exchangeMap[id] = exchange;
+ p->MoveNext();
+ }
+}
+
+void
+MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer,
+ QueueMap& queueMap)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ rsQueues.open(db, TblQueue);
+ _RecordsetPtr p = (_RecordsetPtr)rsQueues;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Queue instance and reset its ID.
+ broker::RecoverableQueue::shared_ptr queue =
+ recoverer.recoverQueue(blob);
+ queue->setPersistenceId(id);
+ queueMap[id] = queue;
+ p->MoveNext();
+ }
+}
+
+void
+MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer,
+ const ExchangeMap& exchangeMap)
+{
+ DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
+ rsBindings.open(db, TblBinding);
+ rsBindings.recover(recoverer, exchangeMap);
+}
+
+void
+MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer,
+ MessageMap& messageMap,
+ MessageQueueMap& messageQueueMap)
+{
+ DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
+ rsMessages.open(db, TblMessage);
+ rsMessages.recover(recoverer, messageMap);
+
+ MessageMapRecordset rsMessageMaps;
+ rsMessageMaps.open(db, TblMessageMap);
+ rsMessageMaps.recover(messageQueueMap);
+}
+
+////////////// Internal Methods
+
+State *
+MSSqlProvider::initState()
+{
+ State *state = dbState.get(); // See if thread has initialized
+ if (!state) {
+ state = new State;
+ dbState.reset(state);
+ }
+ return state;
+}
+
+DatabaseConnection *
+MSSqlProvider::initConnection(void)
+{
+ State *state = initState();
+ if (state->dbConn != 0)
+ return state->dbConn; // And the DatabaseConnection is set up too
+ std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+ db->open(options.connectString, options.catalogName);
+ state->dbConn = db.release();
+ return state->dbConn;
+}
+
+void
+MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name)
+{
+ const std::string dbCmd = "CREATE DATABASE " + name;
+ const std::string useCmd = "USE " + name;
+ const std::string tableCmd = "CREATE TABLE ";
+ const std::string colSpecs =
+ " (persistenceId bigint PRIMARY KEY NOT NULL IDENTITY(1,1),"
+ " fieldTableBlob varbinary(MAX) NOT NULL)";
+ const std::string bindingSpecs =
+ " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL,"
+ " queueName varchar(255) NOT NULL,"
+ " routingKey varchar(255),"
+ " fieldTableBlob varbinary(MAX))";
+ const std::string messageMapSpecs =
+ " (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL,"
+ " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL)";
+ _variant_t unused;
+ _bstr_t dbStr = dbCmd.c_str();
+ conn->Execute(dbStr, &unused, adExecuteNoRecords);
+ _bstr_t useStr = useCmd.c_str();
+ conn->Execute(useStr, &unused, adExecuteNoRecords);
+ std::string makeTable = tableCmd + TblQueue + colSpecs;
+ _bstr_t makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblExchange + colSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblConfig + colSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblMessage + colSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblBinding + bindingSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblMessageMap + messageMapSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+}
+
+void
+MSSqlProvider::dump()
+{
+ // dump all db records to qpid_log
+ QPID_LOG(notice, "DB Dump: (not dumping anything)");
+ // rsQueues.dump();
+}
+
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
new file mode 100644
index 0000000000..7072015864
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+#include <qpid/store/StorageProvider.h>
+
+#include "MessageMapRecordset.h"
+#include "VariantHelper.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+MessageMapRecordset::add(uint64_t messageId, uint64_t queueId)
+{
+ rs->AddNew();
+ rs->Fields->GetItem("messageId")->Value = messageId;
+ rs->Fields->GetItem("queueId")->Value = queueId;
+ rs->Update();
+}
+
+bool
+MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId)
+{
+ // Look up all mappings for the specified message. Then scan
+ // for the specified queue and keep track of whether or not the
+ // message exists on any queue we are not looking for a well.
+ std::ostringstream filter;
+ filter << "messageId = " << messageId << std::ends;
+ rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ MessageMap m;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&m);
+ bool moreEntries = false, deleted = false;
+ // If the desired mapping gets deleted, and we already know there are
+ // other mappings for the message, don't bother finishing the scan.
+ while (!rs->EndOfFile && !(deleted && moreEntries)) {
+ if (m.queueId == queueId) {
+ rs->Delete(adAffectCurrent);
+ rs->Update();
+ deleted = true;
+ }
+ else {
+ moreEntries = true;
+ }
+ rs->MoveNext();
+ }
+ piAdoRecordBinding->Release();
+ return moreEntries;
+}
+
+void
+MessageMapRecordset::recover(MessageQueueMap& msgMap)
+{
+ if (rs->BOF && rs->EndOfFile)
+ return; // Nothing to do
+ rs->MoveFirst();
+ MessageMap b;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&b);
+ while (!rs->EndOfFile) {
+ msgMap[b.messageId].push_back(b.queueId);
+ rs->MoveNext();
+ }
+
+ piAdoRecordBinding->Release();
+}
+
+void
+MessageMapRecordset::dump()
+{
+ Recordset::dump();
+ if (rs->EndOfFile && rs->BOF) // No records
+ return;
+ rs->MoveFirst();
+
+ MessageMap m;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&m);
+
+ while (!rs->EndOfFile) {
+ QPID_LOG(notice, "msg " << m.messageId << " on queue " << m.queueId);
+ rs->MoveNext();
+ }
+
+ piAdoRecordBinding->Release();
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
new file mode 100644
index 0000000000..a2e91abb96
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
@@ -0,0 +1,69 @@
+#ifndef QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H
+#define QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <icrsint.h>
+#include "Recordset.h"
+#include <qpid/broker/RecoveryManager.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class MessageMapRecordset
+ *
+ * Class for the message map (message -> queue) records.
+ */
+class MessageMapRecordset : public Recordset {
+
+ class MessageMap : public CADORecordBinding {
+ BEGIN_ADO_BINDING(MessageMap)
+ ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE)
+ ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, queueId, FALSE)
+ END_ADO_BINDING()
+
+ public:
+ uint64_t messageId;
+ uint64_t queueId;
+ };
+
+public:
+ // Add a new mapping
+ void add(uint64_t messageId, uint64_t queueId);
+
+ // Remove a specific mapping. Returns true if the message is still
+ // enqueued on at least one other queue; false if the message no longer
+ // exists on any other queues.
+ bool remove(uint64_t messageId, uint64_t queueId);
+
+ // Recover the mappings of message ID -> vector<queue ID>.
+ void recover(MessageQueueMap& msgMap);
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
new file mode 100644
index 0000000000..a29b97fa8a
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "MessageRecordset.h"
+#include "BlobAdapter.h"
+#include "BlobEncoder.h"
+#include "VariantHelper.h"
+
+#include <boost/intrusive_ptr.hpp>
+
+class qpid::broker::PersistableMessage;
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+MessageRecordset::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+{
+ BlobEncoder blob (msg); // Marshall headers and content to a blob
+ rs->AddNew();
+ rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+ rs->Update();
+}
+
+void
+MessageRecordset::append(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ const std::string& data)
+{
+ // Look up the message by its Id
+ std::ostringstream filter;
+ filter << "persistenceId = " << msg->getPersistenceId() << std::ends;
+ rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ if (rs->RecordCount == 0) {
+ throw Exception("Can't append to message not stored in database");
+ }
+ BlobEncoder blob (data); // Marshall string data to a blob
+ rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+ rs->Update();
+}
+
+void
+MessageRecordset::remove(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg)
+{
+ BlobRecordset::remove(msg->getPersistenceId());
+}
+
+void
+MessageRecordset::loadContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+ // Look up the message by its Id
+ std::ostringstream filter;
+ filter << "persistenceId = " << msg->getPersistenceId() << std::ends;
+ rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ if (rs->RecordCount == 0) {
+ throw Exception("Can't load message not stored in database");
+ }
+
+ // NOTE! If this code needs to change, please verify the encoding
+ // code in BlobEncoder.
+ long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
+ uint32_t headerSize;
+ const size_t headerFieldLength = sizeof(headerSize);
+ BlobAdapter blob(headerFieldLength);
+ blob =
+ rs->Fields->Item["fieldTableBlob"]->GetChunk((long)headerFieldLength);
+ headerSize = ((qpid::framing::Buffer&)blob).getLong();
+
+ // GetChunk always begins reading where the previous GetChunk left off,
+ // so we can't just tell it to ignore the header and read the data.
+ // So, read the header plus the offset, plus the desired data, then
+ // copy the desired data to the supplied string. If this ends up asking
+ // for more than is available in the field, reduce it to what's there.
+ long getSize = headerSize + offset + length;
+ if (getSize + (long)headerFieldLength > blobSize) {
+ size_t reduce = (getSize + headerFieldLength) - blobSize;
+ getSize -= reduce;
+ length -= reduce;
+ }
+ BlobAdapter header_plus(getSize);
+ header_plus = rs->Fields->Item["fieldTableBlob"]->GetChunk(getSize);
+ uint8_t *throw_away = new uint8_t[headerSize + offset];
+ ((qpid::framing::Buffer&)header_plus).getRawData(throw_away, headerSize + offset);
+ delete throw_away;
+ ((qpid::framing::Buffer&)header_plus).getRawData(data, length);
+}
+
+void
+MessageRecordset::recover(qpid::broker::RecoveryManager& recoverer,
+ std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap)
+{
+ if (rs->BOF && rs->EndOfFile)
+ return; // Nothing to do
+ rs->MoveFirst();
+ Binding b;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&b);
+ while (!rs->EndOfFile) {
+ // The blob was written as normal, but with the header length
+ // prepended in a uint32_t. Due to message staging threshold
+ // limits, the header may be all that's read in; get it first,
+ // recover that message header, then see if the rest is needed.
+ //
+ // NOTE! If this code needs to change, please verify the encoding
+ // code in BlobEncoder.
+ long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
+ uint32_t headerSize;
+ const size_t headerFieldLength = sizeof(headerSize);
+ BlobAdapter blob(headerFieldLength);
+ blob =
+ rs->Fields->Item["fieldTableBlob"]->GetChunk((long)headerFieldLength);
+ headerSize = ((qpid::framing::Buffer&)blob).getLong();
+ BlobAdapter header(headerSize);
+ header = rs->Fields->Item["fieldTableBlob"]->GetChunk(headerSize);
+ broker::RecoverableMessage::shared_ptr msg;
+ msg = recoverer.recoverMessage(header);
+ msg->setPersistenceId(b.messageId);
+ messageMap[b.messageId] = msg;
+
+ // Now, do we need the rest of the content?
+ long contentLength = blobSize - headerFieldLength - headerSize;
+ if (msg->loadContent(contentLength)) {
+ BlobAdapter content(contentLength);
+ content =
+ rs->Fields->Item["fieldTableBlob"]->GetChunk(contentLength);
+ msg->decodeContent(content);
+ }
+ rs->MoveNext();
+ }
+
+ piAdoRecordBinding->Release();
+}
+
+void
+MessageRecordset::dump()
+{
+ Recordset::dump();
+ if (rs->EndOfFile && rs->BOF) // No records
+ return;
+ rs->MoveFirst();
+
+ Binding b;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&b);
+
+ while (VARIANT_FALSE == rs->EndOfFile) {
+ QPID_LOG(notice, "Msg " << b.messageId);
+ rs->MoveNext();
+ }
+
+ piAdoRecordBinding->Release();
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h
new file mode 100644
index 0000000000..f1835f6f73
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h
@@ -0,0 +1,85 @@
+#ifndef QPID_STORE_MSSQL_MESSAGERECORDSET_H
+#define QPID_STORE_MSSQL_MESSAGERECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <icrsint.h>
+#include "BlobRecordset.h"
+#include <qpid/broker/PersistableMessage.h>
+#include <qpid/broker/RecoveryManager.h>
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class MessageRecordset
+ *
+ * Class for storing and recovering messages. Messages are primarily blobs
+ * and handled similarly. However, messages larger than the staging threshold
+ * are not contained completely in memory; they're left mostly in the store
+ * and the header is held in memory. So when the message "blob" is saved,
+ * an additional size-of-the-header field is prepended to the blob.
+ * On recovery, the size-of-the-header is used to get only what's needed
+ * until it's determined if the entire message is to be recovered to memory.
+ */
+class MessageRecordset : public BlobRecordset {
+ class Binding : public CADORecordBinding {
+ BEGIN_ADO_BINDING(Binding)
+ ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE)
+ END_ADO_BINDING()
+
+ public:
+ uint64_t messageId;
+ };
+
+public:
+ // Store a message. Store the header size (4 bytes) then the regular
+ // blob comprising the message.
+ void add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
+ // Append additional content to an existing message.
+ void append(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ const std::string& data);
+
+ // Remove an existing message
+ void remove(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg);
+
+ // Load all or part of a stored message. This skips the header parts and
+ // loads content.
+ void loadContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
+ // Recover messages and save a map of those recovered.
+ void recover(qpid::broker::RecoveryManager& recoverer,
+ std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap);
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_MESSAGERECORDSET_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp
new file mode 100644
index 0000000000..49232ababa
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "Recordset.h"
+#include "BlobEncoder.h"
+#include "DatabaseConnection.h"
+#include "VariantHelper.h"
+
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+#if 0
+Recordset::Iterator::Iterator(Recordset& _rs) : rs(_rs)
+{
+ rs->MoveFirst();
+ setCurrent();
+}
+
+std::pair<uint64_t, BlobAdapter>&
+Recordset::Iterator::dereference() const
+{
+ return const_cast<std::pair<uint64_t, BlobAdapter> >(current);
+}
+
+void
+Recordset::Iterator::increment()
+{
+ rs->MoveNext();
+ setCurrent();
+}
+
+bool
+Recordset::Iterator::equal(const Iterator& x) const
+{
+ return current.first == x.current.first;
+}
+
+void
+Recordset::Iterator::setCurrent()
+{
+ if (!rs->EndOfFile) {
+ uint64_t id = rs->Fields->Item["persistenceId"]->Value;
+ long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ current = std::make_pair(id, blob);
+ }
+ else {
+ current.first = 0;
+ }
+}
+#endif
+
+void
+Recordset::open(DatabaseConnection* conn, const std::string& table)
+{
+ _ConnectionPtr p = *conn;
+ TESTHR(rs.CreateInstance(__uuidof(::Recordset)));
+ rs->Open(table.c_str(),
+ _variant_t((IDispatch *)p, true),
+ adOpenKeyset,
+ adLockOptimistic,
+ adCmdTable);
+ tableName = table;
+}
+
+void
+Recordset::close()
+{
+ if (rs && rs->State == adStateOpen)
+ rs->Close();
+ rs = 0;
+}
+
+void
+Recordset::requery()
+{
+ // Restore the recordset to reflect all current records.
+ rs->Filter = "";
+ rs->Requery(-1);
+}
+
+void
+Recordset::dump()
+{
+ long count = rs->RecordCount;
+ QPID_LOG(notice, "DB Dump: " + tableName <<
+ ": " << count << " records");
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/Recordset.h b/qpid/cpp/src/qpid/store/ms-sql/Recordset.h
new file mode 100644
index 0000000000..3631838aa8
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/Recordset.h
@@ -0,0 +1,101 @@
+#ifndef QPID_STORE_MSSQL_RECORDSET_H
+#define QPID_STORE_MSSQL_RECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
+#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
+ no_namespace rename("EOF", "EndOfFile")
+#include <comdef.h>
+#include <comutil.h>
+#include <string>
+#if 0
+#include <utility>
+#endif
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @class Recordset
+ *
+ * Represents an ADO Recordset, abstracting out the common operations needed
+ * on the common tables used that have 2 fields, persistence ID and blob.
+ */
+class Recordset {
+protected:
+ _RecordsetPtr rs;
+ DatabaseConnection* dbConn;
+ std::string tableName;
+
+public:
+
+#if 0
+ /**
+ * Iterator support for walking through the recordset.
+ * If I need to try this again, I'd look at Recordset cloning.
+ */
+ class Iterator : public boost::iterator_facade<
+ Iterator, std::pair<uint64_t, BlobAdapter>, boost::random_access_traversal_tag>
+ {
+ public:
+ Iterator() : rs(0) { }
+ Iterator(Recordset& _rs);
+
+ private:
+ friend class boost::iterator_core_access;
+
+ std::pair<uint64_t, BlobAdapter>& dereference() const;
+ void increment();
+ bool equal(const Iterator& x) const;
+
+ _RecordsetPtr rs;
+ std::pair<uint64_t, BlobAdapter> current;
+
+ void setCurrent();
+ };
+
+ friend class Iterator;
+#endif
+
+ Recordset() : rs(0) {}
+ virtual ~Recordset() { close(); }
+ void open(DatabaseConnection* conn, const std::string& table);
+ void close();
+ void requery();
+ operator _RecordsetPtr () { return rs; }
+#if 0
+ Iterator begin() { Iterator iter(*this); return iter; }
+ Iterator end() { Iterator iter; return iter; }
+#endif
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_RECORDSET_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/State.cpp b/qpid/cpp/src/qpid/store/ms-sql/State.cpp
new file mode 100644
index 0000000000..720603dd11
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/State.cpp
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "State.h"
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include <comdef.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+State::State() : dbConn(0)
+{
+ HRESULT hr = ::CoInitializeEx(NULL, COINIT_MULTITHREADED);
+ if (hr != S_OK && hr != S_FALSE)
+ throw Exception("Error initializing COM");
+}
+
+State::~State()
+{
+ if (dbConn)
+ delete dbConn;
+ ::CoUninitialize();
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/State.h b/qpid/cpp/src/qpid/store/ms-sql/State.h
new file mode 100644
index 0000000000..6350bc5bd2
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/State.h
@@ -0,0 +1,52 @@
+#ifndef QPID_STORE_MSSQL_STATE_H
+#define QPID_STORE_MSSQL_STATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @struct State
+ *
+ * Represents a thread's state for accessing ADO and the database.
+ * Creating an instance of State initializes COM for this thread, and
+ * destroying it uninitializes COM. There's also a DatabaseConnection
+ * for this thread's default access to the database. More DatabaseConnections
+ * can always be created, but State has one that can always be used by
+ * the thread whose state is represented.
+ *
+ * This class is intended to be one-per-thread, so it should be accessed
+ * via thread-specific storage.
+ */
+struct State {
+ State();
+ ~State();
+ DatabaseConnection *dbConn;
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_STATE_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp
new file mode 100644
index 0000000000..786724f031
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "VariantHelper.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+template <class Wrapped>
+VariantHelper<Wrapped>::VariantHelper()
+{
+ var.vt = VT_EMPTY;
+}
+
+template <class Wrapped>
+VariantHelper<Wrapped>::operator const _variant_t& () const
+{
+ return var;
+}
+
+// Specialization for using _variant_t to wrap a std::string
+VariantHelper<std::string>::VariantHelper(const std::string &init)
+{
+ var.SetString(init.c_str());
+}
+
+VariantHelper<std::string>&
+VariantHelper<std::string>::operator=(const std::string &rhs)
+{
+ var.SetString(rhs.c_str());
+ return *this;
+}
+
+VariantHelper<std::string>::operator const _variant_t& () const
+{
+ return var;
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h
new file mode 100644
index 0000000000..723dbc3b76
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h
@@ -0,0 +1,61 @@
+#ifndef QPID_STORE_MSSQL_VARIANTHELPER_H
+#define QPID_STORE_MSSQL_VARIANTHELPER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <comutil.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class VariantHelper
+ *
+ * Class template to wrap the details of working with _variant_t objects.
+ */
+template <class Wrapped> class VariantHelper {
+private:
+ _variant_t var;
+
+public:
+ VariantHelper();
+ VariantHelper(const Wrapped &init);
+
+ VariantHelper& operator =(const Wrapped& rhs);
+ operator const _variant_t& () const;
+};
+
+// Specialization for using _variant_t to wrap a std::string
+template<> class VariantHelper<std::string> {
+private:
+ _variant_t var;
+
+public:
+ VariantHelper(const std::string &init);
+ VariantHelper& operator =(const std::string& rhs);
+ operator const _variant_t& () const;
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_VARIANTHELPER_H */