summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/store
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/store')
-rw-r--r--qpid/cpp/src/qpid/store/CMakeLists.txt120
-rw-r--r--qpid/cpp/src/qpid/store/MessageStorePlugin.cpp463
-rw-r--r--qpid/cpp/src/qpid/store/MessageStorePlugin.h280
-rw-r--r--qpid/cpp/src/qpid/store/StorageProvider.h329
-rw-r--r--qpid/cpp/src/qpid/store/StoreException.h49
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Log.cpp182
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Log.h78
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Lsn.h36
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp1102
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp406
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h107
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp472
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Messages.h144
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp83
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Transaction.h147
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp428
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h104
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp67
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h85
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp165
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h88
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp64
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h62
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp133
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h61
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp86
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h54
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp91
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h64
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/Exception.h66
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp1286
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp267
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h100
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp184
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h85
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp92
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/Recordset.h75
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp71
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.h67
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/State.cpp45
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/State.h52
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/TplRecordset.cpp128
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/TplRecordset.h58
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp71
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h61
45 files changed, 8258 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/store/CMakeLists.txt b/qpid/cpp/src/qpid/store/CMakeLists.txt
new file mode 100644
index 0000000000..ee7894730a
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/CMakeLists.txt
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+project(qpidc_store)
+
+#set (CMAKE_VERBOSE_MAKEFILE ON) # for debugging
+
+include_directories( ${Boost_INCLUDE_DIR} )
+
+include_directories( ${CMAKE_CURRENT_SOURCE_DIR} )
+include_directories( ${CMAKE_HOME_DIRECTORY}/include )
+
+set (store_SOURCES
+ MessageStorePlugin.cpp
+ )
+add_library (store MODULE ${store_SOURCES})
+target_link_libraries (store qpidbroker qpidcommon)
+if (CMAKE_COMPILER_IS_GNUCXX)
+ set (GCC_CATCH_UNDEFINED "-Wl,--no-undefined")
+ # gcc on SunOS uses native linker whose "-z defs" is too fussy
+ if (CMAKE_SYSTEM_NAME STREQUAL SunOS)
+ set (GCC_CATCH_UNDEFINED "")
+ endif (CMAKE_SYSTEM_NAME STREQUAL SunOS)
+
+ set_target_properties (store PROPERTIES
+ PREFIX ""
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
+ LINK_FLAGS "${GCC_CATCH_UNDEFINED}")
+endif (CMAKE_COMPILER_IS_GNUCXX)
+
+if (CMAKE_SYSTEM_NAME STREQUAL Windows)
+ if (MSVC)
+ add_definitions(
+ /D "NOMINMAX"
+ /D "WIN32_LEAN_AND_MEAN"
+ )
+ endif (MSVC)
+endif (CMAKE_SYSTEM_NAME STREQUAL Windows)
+
+set_target_properties (store PROPERTIES
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
+ VERSION ${qpidc_version})
+install (TARGETS store # RUNTIME
+ DESTINATION ${QPIDD_MODULE_DIR}
+ COMPONENT ${QPID_COMPONENT_BROKER})
+
+# Build the MS SQL Storage Provider plugin
+set (mssql_default ON)
+if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+ set(mssql_default OFF)
+endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+option(BUILD_MSSQL "Build MS SQL Store provider plugin" ${mssql_default})
+if (BUILD_MSSQL)
+ add_library (mssql_store MODULE
+ ms-sql/MsSqlProvider.cpp
+ ms-sql/AmqpTransaction.cpp
+ ms-sql/BindingRecordset.cpp
+ ms-sql/BlobAdapter.cpp
+ ms-sql/BlobEncoder.cpp
+ ms-sql/BlobRecordset.cpp
+ ms-sql/DatabaseConnection.cpp
+ ms-sql/MessageMapRecordset.cpp
+ ms-sql/MessageRecordset.cpp
+ ms-sql/Recordset.cpp
+ ms-sql/SqlTransaction.cpp
+ ms-sql/State.cpp
+ ms-sql/TplRecordset.cpp
+ ms-sql/VariantHelper.cpp)
+ set_target_properties (mssql_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
+ target_link_libraries (mssql_store qpidbroker qpidcommon)
+ install (TARGETS mssql_store # RUNTIME
+ DESTINATION ${QPIDD_MODULE_DIR}
+ COMPONENT ${QPID_COMPONENT_BROKER})
+endif (BUILD_MSSQL)
+
+# Build the MS SQL-CLFS Storage Provider plugin
+set (msclfs_default ON)
+if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+ set(msclfs_default OFF)
+endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+option(BUILD_MSCLFS "Build MS hybrid SQL-CLFS Store provider plugin" ${msclfs_default})
+if (BUILD_MSCLFS)
+ add_library (msclfs_store MODULE
+ ms-clfs/MsSqlClfsProvider.cpp
+ ms-clfs/Log.cpp
+ ms-clfs/MessageLog.cpp
+ ms-clfs/Messages.cpp
+ ms-clfs/Transaction.cpp
+ ms-clfs/TransactionLog.cpp
+ ms-sql/BindingRecordset.cpp
+ ms-sql/BlobAdapter.cpp
+ ms-sql/BlobEncoder.cpp
+ ms-sql/BlobRecordset.cpp
+ ms-sql/DatabaseConnection.cpp
+ ms-sql/Recordset.cpp
+ ms-sql/State.cpp
+ ms-sql/VariantHelper.cpp)
+ include_directories(ms-sql)
+ set_target_properties (msclfs_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
+ target_link_libraries (msclfs_store qpidbroker qpidcommon clfsw32.lib)
+ install (TARGETS msclfs_store # RUNTIME
+ DESTINATION ${QPIDD_MODULE_DIR}
+ COMPONENT ${QPID_COMPONENT_BROKER})
+endif (BUILD_MSCLFS)
diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
new file mode 100644
index 0000000000..b876bd6b6d
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
@@ -0,0 +1,463 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageStorePlugin.h"
+#include "StorageProvider.h"
+#include "StoreException.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/DataDir.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace store {
+
+/*
+ * The MessageStore pointer given to the Broker points to static storage.
+ * Thus, it cannot be deleted, especially by the broker. To prevent deletion,
+ * this no-op deleter is used with the boost::shared_ptr. When the last
+ * shared_ptr is destroyed, the deleter is called rather than delete().
+ */
+namespace {
+ class NoopDeleter {
+ public:
+ NoopDeleter() {}
+ void operator()(qpid::broker::MessageStore * /*p*/) {}
+ };
+}
+
+static MessageStorePlugin static_instance_registers_plugin;
+
+
+MessageStorePlugin::StoreOptions::StoreOptions(const std::string& name) :
+ qpid::Options(name)
+{
+ addOptions()
+ ("storage-provider", qpid::optValue(providerName, "PROVIDER"),
+ "Name of the storage provider to use.")
+ ;
+}
+
+
+void
+MessageStorePlugin::earlyInitialize (qpid::Plugin::Target& target)
+{
+ qpid::broker::Broker* b =
+ dynamic_cast<qpid::broker::Broker*>(&target);
+ if (0 == b)
+ return; // Only listen to Broker targets
+
+ broker = b;
+
+ // See if there are any storage provider plugins ready. If not, we can't
+ // do a message store.
+ qpid::Plugin::earlyInitAll(*this);
+
+ if (providers.empty()) {
+ QPID_LOG(warning,
+ "Message store plugin: No storage providers available.");
+ provider = providers.end();
+ return;
+ }
+ if (!options.providerName.empty()) {
+ // If specific one was chosen, locate it in loaded set of providers.
+ provider = providers.find(options.providerName);
+ if (provider == providers.end())
+ throw Exception("Message store plugin: storage provider '" +
+ options.providerName +
+ "' does not exist.");
+ }
+ else {
+ // No specific provider chosen; if there's only one, use it. Else
+ // report the need to pick one.
+ if (providers.size() > 1) {
+ provider = providers.end();
+ throw Exception("Message store plugin: multiple provider plugins "
+ "loaded; must either load only one or select one "
+ "using --storage-provider");
+ }
+ provider = providers.begin();
+ }
+
+ provider->second->activate(*this);
+ NoopDeleter d;
+ boost::shared_ptr<qpid::broker::MessageStore> sp(this, d);
+ broker->setStore(sp);
+ target.addFinalizer(boost::bind(&MessageStorePlugin::finalizeMe, this));
+}
+
+void
+MessageStorePlugin::initialize(qpid::Plugin::Target& target)
+{
+ qpid::broker::Broker* broker =
+ dynamic_cast<qpid::broker::Broker*>(&target);
+ if (0 == broker)
+ return; // Only listen to Broker targets
+
+ // Pass along the initialize step to the provider that's activated.
+ if (provider != providers.end()) {
+ provider->second->initialize(*this);
+ }
+ // qpid::Plugin::initializeAll(*this);
+}
+
+void
+MessageStorePlugin::finalizeMe()
+{
+ finalize(); // Call finalizers on any Provider plugins
+}
+
+void
+MessageStorePlugin::providerAvailable(const std::string name,
+ StorageProvider *be)
+{
+ ProviderMap::value_type newSp(name, be);
+ std::pair<ProviderMap::iterator, bool> inserted = providers.insert(newSp);
+ if (inserted.second == false)
+ QPID_LOG(warning, "Storage provider " << name << " duplicate; ignored.");
+}
+
+
+/**
+ * Record the existence of a durable queue
+ */
+void
+MessageStorePlugin::create(broker::PersistableQueue& queue,
+ const framing::FieldTable& args)
+{
+ if (queue.getName().size() == 0)
+ {
+ QPID_LOG(error,
+ "Cannot create store for empty (null) queue name - "
+ "ignoring and attempting to continue.");
+ return;
+ }
+ if (queue.getPersistenceId()) {
+ THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
+ }
+ provider->second->create(queue, args);
+}
+
+/**
+ * Destroy a durable queue
+ */
+void
+MessageStorePlugin::destroy(broker::PersistableQueue& queue)
+{
+ provider->second->destroy(queue);
+}
+
+/**
+ * Record the existence of a durable exchange
+ */
+void
+MessageStorePlugin::create(const broker::PersistableExchange& exchange,
+ const framing::FieldTable& args)
+{
+ if (exchange.getPersistenceId()) {
+ THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
+ }
+ provider->second->create(exchange, args);
+}
+
+/**
+ * Destroy a durable exchange
+ */
+void
+MessageStorePlugin::destroy(const broker::PersistableExchange& exchange)
+{
+ provider->second->destroy(exchange);
+}
+
+/**
+ * Record a binding
+ */
+void
+MessageStorePlugin::bind(const broker::PersistableExchange& exchange,
+ const broker::PersistableQueue& queue,
+ const std::string& key,
+ const framing::FieldTable& args)
+{
+ provider->second->bind(exchange, queue, key, args);
+}
+
+/**
+ * Forget a binding
+ */
+void
+MessageStorePlugin::unbind(const broker::PersistableExchange& exchange,
+ const broker::PersistableQueue& queue,
+ const std::string& key,
+ const framing::FieldTable& args)
+{
+ provider->second->unbind(exchange, queue, key, args);
+}
+
+/**
+ * Record generic durable configuration
+ */
+void
+MessageStorePlugin::create(const broker::PersistableConfig& config)
+{
+ if (config.getPersistenceId()) {
+ THROW_STORE_EXCEPTION("Config item already created: " +
+ config.getName());
+ }
+ provider->second->create(config);
+}
+
+/**
+ * Destroy generic durable configuration
+ */
+void
+MessageStorePlugin::destroy(const broker::PersistableConfig& config)
+{
+ provider->second->destroy(config);
+}
+
+/**
+ * Stores a message before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point).
+ */
+void
+MessageStorePlugin::stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg)
+{
+ if (msg->getPersistenceId() == 0) {
+ provider->second->stage(msg);
+ }
+}
+
+/**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+void
+MessageStorePlugin::destroy(broker::PersistableMessage& msg)
+{
+ if (msg.getPersistenceId())
+ provider->second->destroy(msg);
+}
+
+/**
+ * Appends content to a previously staged message
+ */
+void
+MessageStorePlugin::appendContent
+ (const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+ const std::string& data)
+{
+ if (msg->getPersistenceId())
+ provider->second->appendContent(msg, data);
+ else
+ THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!");
+}
+
+/**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+void
+MessageStorePlugin::loadContent(const broker::PersistableQueue& queue,
+ const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+ if (msg->getPersistenceId())
+ provider->second->loadContent(queue, msg, data, offset, length);
+ else
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+}
+
+/**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ */
+void
+MessageStorePlugin::enqueue(broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+ const broker::PersistableQueue& queue)
+{
+ if (queue.getPersistenceId() == 0) {
+ THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
+ }
+ provider->second->enqueue(ctxt, msg, queue);
+}
+
+/**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ */
+void
+MessageStorePlugin::dequeue(broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+ const broker::PersistableQueue& queue)
+{
+ provider->second->dequeue(ctxt, msg, queue);
+}
+
+/**
+ * Flushes all async messages to disk for the specified queue
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ */
+void
+MessageStorePlugin::flush(const broker::PersistableQueue& queue)
+{
+ provider->second->flush(queue);
+}
+
+/**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk.
+ */
+uint32_t
+MessageStorePlugin::outstandingQueueAIO(const broker::PersistableQueue& queue)
+{
+ return provider->second->outstandingQueueAIO(queue);
+}
+
+std::auto_ptr<broker::TransactionContext>
+MessageStorePlugin::begin()
+{
+ return provider->second->begin();
+}
+
+std::auto_ptr<broker::TPCTransactionContext>
+MessageStorePlugin::begin(const std::string& xid)
+{
+ return provider->second->begin(xid);
+}
+
+void
+MessageStorePlugin::prepare(broker::TPCTransactionContext& ctxt)
+{
+ provider->second->prepare(ctxt);
+}
+
+void
+MessageStorePlugin::commit(broker::TransactionContext& ctxt)
+{
+ provider->second->commit(ctxt);
+}
+
+void
+MessageStorePlugin::abort(broker::TransactionContext& ctxt)
+{
+ provider->second->abort(ctxt);
+}
+
+void
+MessageStorePlugin::collectPreparedXids(std::set<std::string>& xids)
+{
+ provider->second->collectPreparedXids(xids);
+}
+
+/**
+ * Request recovery of queue and message state; inherited from Recoverable
+ */
+void
+MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
+{
+ ExchangeMap exchanges;
+ QueueMap queues;
+ MessageMap messages;
+ MessageQueueMap messageQueueMap;
+ std::vector<std::string> xids;
+ PreparedTransactionMap dtxMap;
+
+ provider->second->recoverConfigs(recoverer);
+ provider->second->recoverExchanges(recoverer, exchanges);
+ provider->second->recoverQueues(recoverer, queues);
+ provider->second->recoverBindings(recoverer, exchanges, queues);
+ // Important to recover messages before transactions in the SQL-CLFS
+ // case. If this becomes a problem, it may be possible to resolve it.
+ // If in doubt please raise a jira and notify Steve Huston
+ // <shuston@riverace.com>.
+ provider->second->recoverMessages(recoverer, messages, messageQueueMap);
+ provider->second->recoverTransactions(recoverer, dtxMap);
+ // Enqueue msgs where needed.
+ for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
+ i != messageQueueMap.end();
+ ++i) {
+ // Locate the message corresponding to the current message Id
+ MessageMap::const_iterator iMsg = messages.find(i->first);
+ if (iMsg == messages.end()) {
+ std::ostringstream oss;
+ oss << "No matching message trying to re-enqueue message "
+ << i->first;
+ THROW_STORE_EXCEPTION(oss.str());
+ }
+ broker::RecoverableMessage::shared_ptr msg = iMsg->second;
+ // Now for each queue referenced in the queue map, locate it
+ // and re-enqueue the message.
+ for (std::vector<QueueEntry>::const_iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ // Locate the queue corresponding to the current queue Id
+ QueueMap::const_iterator iQ = queues.find(j->queueId);
+ if (iQ == queues.end()) {
+ std::ostringstream oss;
+ oss << "No matching queue trying to re-enqueue message "
+ << " on queue Id " << j->queueId;
+ THROW_STORE_EXCEPTION(oss.str());
+ }
+ // Messages involved in prepared transactions have their status
+ // updated accordingly. First, though, restore a message that
+ // is expected to be on a queue, including non-transacted
+ // messages and those pending dequeue in a dtx.
+ if (j->tplStatus != QueueEntry::ADDING)
+ iQ->second->recover(msg);
+ switch(j->tplStatus) {
+ case QueueEntry::ADDING:
+ dtxMap[j->xid]->enqueue(iQ->second, msg);
+ break;
+ case QueueEntry::REMOVING:
+ dtxMap[j->xid]->dequeue(iQ->second, msg);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+}
+
+}} // namespace qpid::store
diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.h b/qpid/cpp/src/qpid/store/MessageStorePlugin.h
new file mode 100644
index 0000000000..5290fc16db
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.h
@@ -0,0 +1,280 @@
+#ifndef QPID_STORE_MESSAGESTOREPLUGIN_H
+#define QPID_STORE_MESSAGESTOREPLUGIN_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/broker/MessageStore.h"
+//#include "qpid/management/Manageable.h"
+
+#include <string>
+
+using namespace qpid;
+
+namespace qpid {
+
+namespace broker {
+class Broker;
+class PersistableExchange;
+class PersistableMessage;
+class PersistableQueue;
+}
+
+namespace store {
+
+class StorageProvider;
+
+/**
+ * @class MessageStorePlugin
+ *
+ * MessageStorePlugin is the front end of the persistent message store
+ * plugin. It is responsible for coordinating recovery, initialization,
+ * transactions (both local and distributed), flow-to-disk loading and
+ * unloading and persisting broker state (queues, bindings etc.).
+ * Actual storage operations are carried out by a message store storage
+ * provider that implements the qpid::store::StorageProvider interface.
+ */
+class MessageStorePlugin :
+ public qpid::Plugin,
+ public qpid::broker::MessageStore, // Frontend classes
+ public qpid::Plugin::Target // Provider target
+ // @TODO Need a mgmt story for this. Maybe allow r/o access to provider store info? public qpid::management::Manageable
+{
+ public:
+ MessageStorePlugin() : broker(0) {}
+
+ /**
+ * @name Methods inherited from qpid::Plugin
+ */
+ //@{
+ virtual Options* getOptions() { return &options; }
+ virtual void earlyInitialize (Plugin::Target& target);
+ virtual void initialize(Plugin::Target& target);
+ //@}
+
+ /// Finalizer; calls Target::finalize() to run finalizers on
+ /// StorageProviders.
+ void finalizeMe();
+
+ /**
+ * Called by StorageProvider instances during the earlyInitialize sequence.
+ * Each StorageProvider must supply a unique name by which it is known and a
+ * pointer to itself.
+ */
+ virtual void providerAvailable(const std::string name, StorageProvider *be);
+
+ /**
+ * @name Methods inherited from qpid::broker::MessageStore
+ */
+
+ /**
+ * Record the existence of a durable queue
+ */
+ virtual void create(broker::PersistableQueue& queue,
+ const framing::FieldTable& args);
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(broker::PersistableQueue& queue);
+
+ /**
+ * Record the existence of a durable exchange
+ */
+ virtual void create(const broker::PersistableExchange& exchange,
+ const framing::FieldTable& args);
+ /**
+ * Destroy a durable exchange
+ */
+ virtual void destroy(const broker::PersistableExchange& exchange);
+
+ /**
+ * Record a binding
+ */
+ virtual void bind(const broker::PersistableExchange& exchange,
+ const broker::PersistableQueue& queue,
+ const std::string& key,
+ const framing::FieldTable& args);
+
+ /**
+ * Forget a binding
+ */
+ virtual void unbind(const broker::PersistableExchange& exchange,
+ const broker::PersistableQueue& queue,
+ const std::string& key,
+ const framing::FieldTable& args);
+
+ /**
+ * Record generic durable configuration
+ */
+ virtual void create(const broker::PersistableConfig& config);
+
+ /**
+ * Destroy generic durable configuration
+ */
+ virtual void destroy(const broker::PersistableConfig& config);
+
+ /**
+ * Stores a message before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+ virtual void stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg);
+
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(broker::PersistableMessage& msg);
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+ const std::string& data);
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+ virtual void loadContent(const broker::PersistableQueue& queue,
+ const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+ const broker::PersistableQueue& queue);
+
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of the queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+ const broker::PersistableQueue& queue);
+
+ /**
+ * Flushes all async messages to disk for the specified queue
+ *
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ *
+ * @param queue the name of the queue from which it is to be dequeued
+ */
+ virtual void flush(const broker::PersistableQueue& queue);
+
+ /**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk
+ *
+ * @param queue the name of the queue to check for outstanding AIO
+ */
+ virtual uint32_t outstandingQueueAIO(const broker::PersistableQueue& queue);
+ //@}
+
+ /**
+ * @name Methods inherited from qpid::broker::TransactionalStore
+ */
+ //@{
+ std::auto_ptr<broker::TransactionContext> begin();
+
+ std::auto_ptr<broker::TPCTransactionContext> begin(const std::string& xid);
+
+ void prepare(broker::TPCTransactionContext& ctxt);
+
+ void commit(broker::TransactionContext& ctxt);
+
+ void abort(broker::TransactionContext& ctxt);
+
+ void collectPreparedXids(std::set<std::string>& xids);
+ //@}
+
+ /**
+ * Request recovery of queue and message state; inherited from Recoverable
+ */
+ virtual void recover(broker::RecoveryManager& recoverer);
+
+ // inline management::Manageable::status_t ManagementMethod (uint32_t, management::Args&, std::string&)
+ // { return management::Manageable::STATUS_OK; }
+
+ // So storage provider can get the broker info.
+ broker::Broker *getBroker() { return broker; }
+
+ protected:
+
+ struct StoreOptions : public qpid::Options {
+ StoreOptions(const std::string& name="Store Options");
+ std::string providerName;
+ };
+ StoreOptions options;
+
+ typedef std::map<std::string, StorageProvider*> ProviderMap;
+ ProviderMap providers;
+ ProviderMap::const_iterator provider;
+
+ broker::Broker *broker;
+
+}; // class MessageStoreImpl
+
+}} // namespace qpid::store
+
+#endif /* QPID_SERIALIZER_H */
diff --git a/qpid/cpp/src/qpid/store/StorageProvider.h b/qpid/cpp/src/qpid/store/StorageProvider.h
new file mode 100644
index 0000000000..de12ffb869
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/StorageProvider.h
@@ -0,0 +1,329 @@
+#ifndef QPID_STORE_STORAGEPROVIDER_H
+#define QPID_STORE_STORAGEPROVIDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <map>
+#include <stdexcept>
+#include <vector>
+#include "qpid/Exception.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/broker/MessageStore.h"
+
+using qpid::broker::PersistableConfig;
+using qpid::broker::PersistableExchange;
+using qpid::broker::PersistableMessage;
+using qpid::broker::PersistableQueue;
+
+namespace qpid {
+namespace store {
+
+typedef std::map<uint64_t, qpid::broker::RecoverableExchange::shared_ptr>
+ ExchangeMap;
+typedef std::map<uint64_t, qpid::broker::RecoverableQueue::shared_ptr>
+ QueueMap;
+typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr>
+ MessageMap;
+// Msg Id -> vector of queue entries where message is queued
+struct QueueEntry {
+ enum TplStatus { NONE = 0, ADDING = 1, REMOVING = 2 };
+ uint64_t queueId;
+ TplStatus tplStatus;
+ std::string xid;
+
+ QueueEntry(uint64_t id, TplStatus tpl = NONE, const std::string& x = "")
+ : queueId(id), tplStatus(tpl), xid(x) {}
+
+ bool operator==(const QueueEntry& rhs) const {
+ if (queueId != rhs.queueId) return false;
+ if (tplStatus == NONE && rhs.tplStatus == NONE) return true;
+ return xid == rhs.xid;
+ }
+};
+typedef std::map<uint64_t, std::vector<QueueEntry> > MessageQueueMap;
+typedef std::map<std::string, qpid::broker::RecoverableTransaction::shared_ptr>
+ PreparedTransactionMap;
+
+class MessageStorePlugin;
+
+/**
+ * @class StorageProvider
+ *
+ * StorageProvider defines the interface for the storage provider plugin to the
+ * Qpid broker persistence store plugin.
+ *
+ * @TODO Should StorageProvider also inherit from MessageStore? If so, then
+ * maybe remove Recoverable from MessageStore's inheritance and move it
+ * to MessageStorePlugin? In any event, somehow the discardInit() feature
+ * needs to get added here.
+ */
+class StorageProvider : public qpid::Plugin, public qpid::broker::MessageStore
+{
+public:
+
+ class Exception : public qpid::Exception
+ {
+ public:
+ virtual ~Exception() throw() {}
+ virtual const char *what() const throw() = 0;
+ };
+
+ /**
+ * @name Methods inherited from qpid::Plugin
+ */
+ //@{
+ /**
+ * Return a pointer to the provider's options. The options will be
+ * updated during option parsing by the host program; therefore, the
+ * referenced Options object must remain valid past this function's return.
+ *
+ * @return An options group or 0 for no options. Default returns 0.
+ * Plugin retains ownership of return value.
+ */
+ virtual qpid::Options* getOptions() = 0;
+
+ /**
+ * Initialize Plugin functionality on a Target, called before
+ * initializing the target.
+ *
+ * StorageProviders should respond only to Targets of class
+ * qpid::store::MessageStorePlugin and ignore all others.
+ *
+ * When called, the provider should invoke the method
+ * qpid::store::MessageStorePlugin::providerAvailable() to alert the
+ * message store of StorageProvider's availability.
+ *
+ * Called before the target itself is initialized.
+ */
+ virtual void earlyInitialize (Plugin::Target& target) = 0;
+
+ /**
+ * Initialize StorageProvider functionality. Called after initializing
+ * the target.
+ *
+ * StorageProviders should respond only to Targets of class
+ * qpid::store::MessageStorePlugin and ignore all others.
+ *
+ * Called after the target is fully initialized.
+ */
+ virtual void initialize(Plugin::Target& target) = 0;
+ //@}
+
+ /**
+ * Receive notification that this provider is the one that will actively
+ * handle storage for the target. If the provider is to be used, this
+ * method will be called after earlyInitialize() and before any
+ * recovery operations (recovery, in turn, precedes call to initialize()).
+ * Thus, it is wise to not actually do any database ops from within
+ * earlyInitialize() - they can wait until activate() is called because
+ * at that point it is certain the database will be needed.
+ */
+ virtual void activate(MessageStorePlugin &store) = 0;
+
+ /**
+ * @name Methods inherited from qpid::broker::MessageStore
+ */
+
+ /**
+ * Record the existence of a durable queue
+ */
+ virtual void create(PersistableQueue& queue,
+ const qpid::framing::FieldTable& args) = 0;
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(PersistableQueue& queue) = 0;
+
+ /**
+ * Record the existence of a durable exchange
+ */
+ virtual void create(const PersistableExchange& exchange,
+ const qpid::framing::FieldTable& args) = 0;
+ /**
+ * Destroy a durable exchange
+ */
+ virtual void destroy(const PersistableExchange& exchange) = 0;
+
+ /**
+ * Record a binding
+ */
+ virtual void bind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args) = 0;
+
+ /**
+ * Forget a binding
+ */
+ virtual void unbind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args) = 0;
+
+ /**
+ * Record generic durable configuration
+ */
+ virtual void create(const PersistableConfig& config) = 0;
+
+ /**
+ * Destroy generic durable configuration
+ */
+ virtual void destroy(const PersistableConfig& config) = 0;
+
+ /**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+ virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg) = 0;
+
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(PersistableMessage& msg) = 0;
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data) = 0;
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+ virtual void loadContent(const PersistableQueue& queue,
+ const boost::intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length) = 0;
+
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue) = 0;
+
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of the queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue) = 0;
+
+ /**
+ * Flushes all async messages to disk for the specified queue
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param queue the name of the queue from which it is to be dequeued
+ */
+ virtual void flush(const qpid::broker::PersistableQueue& queue) = 0;
+
+ /**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk
+ *
+ * @param queue the name of the queue to check for outstanding AIO
+ */
+ virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) = 0;
+ //@}
+
+ /**
+ * @TODO This should probably not be here - it's only here because
+ * MessageStore inherits from Recoverable... maybe move that derivation.
+ *
+ * As it is now, we don't use this. Separate recover methods are
+ * declared below for individual types, which also set up maps of
+ * messages, queues, transactions for the main store plugin to handle
+ * properly.
+ *
+ * Request recovery of queue and message state.
+ */
+ virtual void recover(qpid::broker::RecoveryManager& /*recoverer*/) {}
+
+ /**
+ * @name Methods that do the recovery of the various objects that
+ * were saved.
+ */
+ //@{
+
+ /**
+ * Recover bindings.
+ */
+ virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer) = 0;
+ virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+ ExchangeMap& exchangeMap) = 0;
+ virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
+ QueueMap& queueMap) = 0;
+ virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap) = 0;
+ virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
+ MessageMap& messageMap,
+ MessageQueueMap& messageQueueMap) = 0;
+ virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap) = 0;
+ //@}
+};
+
+}} // namespace qpid::store
+
+#endif /* QPID_STORE_STORAGEPROVIDER_H */
diff --git a/qpid/cpp/src/qpid/store/StoreException.h b/qpid/cpp/src/qpid/store/StoreException.h
new file mode 100644
index 0000000000..1dc7f670ec
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/StoreException.h
@@ -0,0 +1,49 @@
+#ifndef QPID_STORE_STOREEXCEPTION_H
+#define QPID_STORE_STOREEXCEPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <exception>
+#include <boost/format.hpp>
+#include "StorageProvider.h"
+
+namespace qpid {
+namespace store {
+
+class StoreException : public std::exception
+{
+ std::string text;
+public:
+ StoreException(const std::string& _text) : text(_text) {}
+ StoreException(const std::string& _text,
+ const StorageProvider::Exception& cause)
+ : text(_text + ": " + cause.what()) {}
+ virtual ~StoreException() throw() {}
+ virtual const char* what() const throw() { return text.c_str(); }
+};
+
+#define THROW_STORE_EXCEPTION(MESSAGE) throw qpid::store::StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ % __LINE__))
+#define THROW_STORE_EXCEPTION_2(MESSAGE, EXCEPTION) throw qpid::store::StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ % __LINE__), EXCEPTION)
+
+}} // namespace qpid::store
+
+#endif /* QPID_STORE_STOREEXCEPTION_H */
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
new file mode 100644
index 0000000000..e6cb10c133
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <windows.h>
+#include <clfsw32.h>
+#include <clfsmgmtw32.h>
+#include <sstream>
+#include <string>
+#include <vector>
+#include <stdlib.h>
+#include <qpid/sys/windows/check.h>
+
+#include "Log.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+Log::~Log()
+{
+ if (marshal != 0)
+ ::DeleteLogMarshallingArea(marshal);
+ ::CloseHandle(handle);
+}
+
+void
+Log::open(const std::string& path, const TuningParameters& params)
+{
+ this->containerSize = static_cast<ULONGLONG>(params.containerSize);
+ logPath = path;
+ std::string logSpec = "log:" + path;
+ size_t specLength = logSpec.length();
+ std::auto_ptr<wchar_t> wLogSpec(new wchar_t[specLength + 1]);
+ size_t converted;
+ mbstowcs_s(&converted,
+ wLogSpec.get(), specLength+1,
+ logSpec.c_str(), specLength);
+ handle = ::CreateLogFile(wLogSpec.get(),
+ GENERIC_WRITE | GENERIC_READ,
+ 0,
+ 0,
+ OPEN_ALWAYS,
+ 0);
+ QPID_WINDOWS_CHECK_NOT(handle, INVALID_HANDLE_VALUE);
+ CLFS_INFORMATION info;
+ ULONG infoSize = sizeof(info);
+ BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+ ok = ::RegisterManageableLogClient(handle, 0);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ // Set up policies for how many containers to initially create and how
+ // large each container should be. Also, auto-grow the log when container
+ // space runs out.
+ CLFS_MGMT_POLICY logPolicy;
+ logPolicy.Version = CLFS_MGMT_POLICY_VERSION;
+ logPolicy.LengthInBytes = sizeof(logPolicy);
+ logPolicy.PolicyFlags = 0;
+
+ // If this is the first time this log is opened, give an opportunity to
+ // initialize its content.
+ bool needInitialize(false);
+ if (info.TotalContainers == 0) {
+ // New log; set the configured container size and create the
+ // initial set of containers.
+ logPolicy.PolicyType = ClfsMgmtPolicyNewContainerSize;
+ logPolicy.PolicyParameters.NewContainerSize.SizeInBytes = containerSize;
+ ok = ::InstallLogPolicy(handle, &logPolicy);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ ULONGLONG desired(params.containers), actual(0);
+ ok = ::SetLogFileSizeWithPolicy(handle, &desired, &actual);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ needInitialize = true;
+ }
+ // Ensure that the log is extended as needed and will shrink when 50%
+ // becomes unused.
+ logPolicy.PolicyType = ClfsMgmtPolicyAutoGrow;
+ logPolicy.PolicyParameters.AutoGrow.Enabled = 1;
+ ok = ::InstallLogPolicy(handle, &logPolicy);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+ logPolicy.PolicyType = ClfsMgmtPolicyAutoShrink;
+ logPolicy.PolicyParameters.AutoShrink.Percentage = params.shrinkPct;
+ ok = ::InstallLogPolicy(handle, &logPolicy);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ // Need a marshaling area
+ ok = ::CreateLogMarshallingArea(handle,
+ NULL, NULL, NULL, // Alloc, free, context
+ marshallingBufferSize(),
+ params.maxWriteBuffers,
+ 1, // Max read buffers
+ &marshal);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+ if (needInitialize)
+ initialize();
+}
+
+uint32_t
+Log::marshallingBufferSize()
+{
+ // Default implementation returns the minimum marshalling buffer size;
+ // derived ones should come up with a more fitting value.
+ //
+ // Find the directory name part of the log specification, including the
+ // trailing '\'.
+ size_t dirMarker = logPath.rfind('\\');
+ if (dirMarker == std::string::npos)
+ dirMarker = logPath.rfind('/');
+ DWORD bytesPerSector;
+ DWORD dontCare;
+ ::GetDiskFreeSpace(logPath.substr(0, dirMarker).c_str(),
+ &dontCare,
+ &bytesPerSector,
+ &dontCare,
+ &dontCare);
+ return bytesPerSector;
+}
+
+CLFS_LSN
+Log::write(void* entry, uint32_t length, CLFS_LSN* prev)
+{
+ CLFS_WRITE_ENTRY desc;
+ desc.Buffer = entry;
+ desc.ByteLength = length;
+ CLFS_LSN lsn;
+ BOOL ok = ::ReserveAndAppendLog(marshal,
+ &desc, 1, // Buffer descriptor
+ 0, prev, // Undo-Next, Prev
+ 0, 0, // Reservation
+ CLFS_FLAG_FORCE_FLUSH,
+ &lsn,
+ 0);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+ return lsn;
+}
+
+// Get the current base LSN of the log.
+CLFS_LSN
+Log::getBase()
+{
+ CLFS_INFORMATION info;
+ ULONG infoSize = sizeof(info);
+ BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+ return info.BaseLsn;
+}
+
+void
+Log::moveTail(const CLFS_LSN& oldest)
+{
+ BOOL ok = ::AdvanceLogBase(marshal,
+ const_cast<PCLFS_LSN>(&oldest),
+ 0, NULL);
+ // If multiple threads are manipulating things they may get out of
+ // order when moving the tail; if someone already moved it further
+ // than this, it's ok - ignore it.
+ if (ok || ::GetLastError() == ERROR_LOG_START_OF_LOG)
+ return;
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+}
+
+}}} // namespace qpid::store::ms_clfs
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Log.h b/qpid/cpp/src/qpid/store/ms-clfs/Log.h
new file mode 100644
index 0000000000..2f7eb6cada
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Log.h
@@ -0,0 +1,78 @@
+#ifndef QPID_STORE_MSCLFS_LOG_H
+#define QPID_STORE_MSCLFS_LOG_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <windows.h>
+#include <clfsw32.h>
+#include <qpid/sys/IntegerTypes.h>
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+/**
+ * @class Log
+ *
+ * Represents a CLFS-housed log.
+ */
+class Log {
+
+protected:
+ HANDLE handle;
+ ULONGLONG containerSize;
+ std::string logPath;
+ PVOID marshal;
+
+ // Give subclasses a chance to initialize a new log. Called after a new
+ // log is created, initial set of containers is added, and marshalling
+ // area is allocated.
+ virtual void initialize() {}
+
+public:
+ struct TuningParameters {
+ size_t containerSize;
+ unsigned short containers;
+ unsigned short shrinkPct;
+ uint32_t maxWriteBuffers;
+ };
+
+ Log() : handle(INVALID_HANDLE_VALUE), containerSize(0), marshal(0) {}
+ virtual ~Log();
+
+ void open(const std::string& path, const TuningParameters& params);
+
+ virtual uint32_t marshallingBufferSize();
+
+ CLFS_LSN write(void* entry, uint32_t length, CLFS_LSN* prev = 0);
+
+ // Get the current base LSN of the log.
+ CLFS_LSN getBase();
+
+ // Move the log tail to the indicated LSN.
+ void moveTail(const CLFS_LSN& oldest);
+};
+
+}}} // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_LOG_H */
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h b/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h
new file mode 100644
index 0000000000..7f46c1f266
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h
@@ -0,0 +1,36 @@
+#ifndef QPID_STORE_MSCLFS_LSN_H
+#define QPID_STORE_MSCLFS_LSN_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <clfsw32.h>
+
+namespace {
+ // Make it easy to assign LSNs
+ inline CLFS_LSN idToLsn(const uint64_t val)
+ { CLFS_LSN lsn; lsn.Internal = val; return lsn; }
+
+ inline uint64_t lsnToId(const CLFS_LSN& lsn)
+ { uint64_t val = lsn.Internal; return val; }
+}
+
+#endif /* QPID_STORE_MSCLFS_LSN_H */
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
new file mode 100644
index 0000000000..5ff24e7d33
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
@@ -0,0 +1,1102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <list>
+#include <map>
+#include <set>
+#include <stdlib.h>
+#include <string>
+#include <windows.h>
+#include <clfsw32.h>
+#include <qpid/broker/Broker.h>
+#include <qpid/broker/RecoverableQueue.h>
+#include <qpid/log/Statement.h>
+#include <qpid/store/MessageStorePlugin.h>
+#include <qpid/store/StoreException.h>
+#include <qpid/store/StorageProvider.h>
+#include <qpid/sys/Mutex.h>
+#include <boost/foreach.hpp>
+
+// From ms-sql...
+#include "BlobAdapter.h"
+#include "BlobRecordset.h"
+#include "BindingRecordset.h"
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include "State.h"
+#include "VariantHelper.h"
+using qpid::store::ms_sql::BlobAdapter;
+using qpid::store::ms_sql::BlobRecordset;
+using qpid::store::ms_sql::BindingRecordset;
+using qpid::store::ms_sql::DatabaseConnection;
+using qpid::store::ms_sql::ADOException;
+using qpid::store::ms_sql::State;
+using qpid::store::ms_sql::VariantHelper;
+
+#include "Log.h"
+#include "Messages.h"
+#include "Transaction.h"
+#include "TransactionLog.h"
+
+// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
+#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
+ no_namespace rename("EOF", "EndOfFile")
+#include <comdef.h>
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+
+// Table names
+const std::string TblBinding("tblBinding");
+const std::string TblConfig("tblConfig");
+const std::string TblExchange("tblExchange");
+const std::string TblQueue("tblQueue");
+
+}
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+/**
+ * @class MSSqlClfsProvider
+ *
+ * Implements a qpid::store::StorageProvider that uses a hybrid Microsoft
+ * SQL Server and Windows CLFS approach as the backend data store for Qpid.
+ */
+class MSSqlClfsProvider : public qpid::store::StorageProvider
+{
+protected:
+ void finalizeMe();
+
+ void dump();
+
+public:
+ MSSqlClfsProvider();
+ ~MSSqlClfsProvider();
+
+ virtual qpid::Options* getOptions() { return &options; }
+
+ virtual void earlyInitialize (Plugin::Target& target);
+ virtual void initialize(Plugin::Target& target);
+
+ /**
+ * Receive notification that this provider is the one that will actively
+ * handle provider storage for the target. If the provider is to be used,
+ * this method will be called after earlyInitialize() and before any
+ * recovery operations (recovery, in turn, precedes call to initialize()).
+ */
+ virtual void activate(MessageStorePlugin &store);
+
+ /**
+ * @name Methods inherited from qpid::broker::MessageStore
+ */
+
+ /**
+ * Record the existence of a durable queue
+ */
+ virtual void create(PersistableQueue& queue,
+ const qpid::framing::FieldTable& args);
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(PersistableQueue& queue);
+
+ /**
+ * Record the existence of a durable exchange
+ */
+ virtual void create(const PersistableExchange& exchange,
+ const qpid::framing::FieldTable& args);
+ /**
+ * Destroy a durable exchange
+ */
+ virtual void destroy(const PersistableExchange& exchange);
+
+ /**
+ * Record a binding
+ */
+ virtual void bind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+
+ /**
+ * Forget a binding
+ */
+ virtual void unbind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+
+ /**
+ * Record generic durable configuration
+ */
+ virtual void create(const PersistableConfig& config);
+
+ /**
+ * Destroy generic durable configuration
+ */
+ virtual void destroy(const PersistableConfig& config);
+
+ /**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+ virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg);
+
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(PersistableMessage& msg);
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data);
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+ virtual void loadContent(const qpid::broker::PersistableQueue& queue,
+ const boost::intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
+
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of the queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
+
+ /**
+ * Flushes all async messages to disk for the specified queue
+ *
+ * Note: this is a no-op for this provider.
+ *
+ * @param queue the name of the queue from which it is to be dequeued
+ */
+ virtual void flush(const PersistableQueue& queue) {};
+
+ /**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk
+ *
+ * @param queue the name of the queue to check for outstanding AIO
+ */
+ virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue)
+ {return 0;}
+ //@}
+
+ /**
+ * @name Methods inherited from qpid::broker::TransactionalStore
+ */
+ //@{
+ virtual std::auto_ptr<qpid::broker::TransactionContext> begin();
+ virtual std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+ virtual void prepare(qpid::broker::TPCTransactionContext& txn);
+ virtual void commit(qpid::broker::TransactionContext& txn);
+ virtual void abort(qpid::broker::TransactionContext& txn);
+ virtual void collectPreparedXids(std::set<std::string>& xids);
+ //@}
+
+ virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer);
+ virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+ ExchangeMap& exchangeMap);
+ virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
+ QueueMap& queueMap);
+ virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap);
+ virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
+ MessageMap& messageMap,
+ MessageQueueMap& messageQueueMap);
+ virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap);
+
+private:
+ struct ProviderOptions : public qpid::Options
+ {
+ std::string connectString;
+ std::string catalogName;
+ std::string storeDir;
+ size_t containerSize;
+ unsigned short initialContainers;
+ uint32_t maxWriteBuffers;
+
+ ProviderOptions(const std::string &name)
+ : qpid::Options(name),
+ catalogName("QpidStore"),
+ containerSize(1024 * 1024),
+ initialContainers(2),
+ maxWriteBuffers(10)
+ {
+ const enum { NAMELEN = MAX_COMPUTERNAME_LENGTH + 1 };
+ TCHAR myName[NAMELEN];
+ DWORD myNameLen = NAMELEN;
+ GetComputerName(myName, &myNameLen);
+ connectString = "Data Source=";
+ connectString += myName;
+ connectString += "\\SQLEXPRESS;Integrated Security=SSPI";
+ addOptions()
+ ("connect",
+ qpid::optValue(connectString, "STRING"),
+ "Connection string for the database to use. Will prepend "
+ "Provider=SQLOLEDB;")
+ ("catalog",
+ qpid::optValue(catalogName, "DB NAME"),
+ "Catalog (database) name")
+ ("store-dir",
+ qpid::optValue(storeDir, "DIR"),
+ "Location to store message and transaction data "
+ "(default uses data-dir if available)")
+ ("container-size",
+ qpid::optValue(containerSize, "VALUE"),
+ "Bytes per container; min 512K. Only used when creating "
+ "a new log")
+ ("initial-containers",
+ qpid::optValue(initialContainers, "VALUE"),
+ "Number of containers to add if creating a new log")
+ ("max-write-buffers",
+ qpid::optValue(maxWriteBuffers, "VALUE"),
+ "Maximum write buffers outstanding before log is flushed "
+ "(0 means no limit)")
+ ;
+ }
+ };
+ ProviderOptions options;
+ std::string brokerDataDir;
+ Messages messages;
+ // TransactionLog requires itself to have a shared_ptr reference to start.
+ TransactionLog::shared_ptr transactions;
+
+ // Each thread has a separate connection to the database and also needs
+ // to manage its COM initialize/finalize individually. This is done by
+ // keeping a thread-specific State.
+ boost::thread_specific_ptr<State> dbState;
+
+ State *initState();
+ DatabaseConnection *initConnection(void);
+ void createDb(DatabaseConnection *db, const std::string &name);
+ void createLogs();
+};
+
+static MSSqlClfsProvider static_instance_registers_plugin;
+
+void
+MSSqlClfsProvider::finalizeMe()
+{
+ dbState.reset();
+}
+
+MSSqlClfsProvider::MSSqlClfsProvider()
+ : options("MS SQL/CLFS Provider options")
+{
+ transactions.reset(new TransactionLog());
+}
+
+MSSqlClfsProvider::~MSSqlClfsProvider()
+{
+}
+
+void
+MSSqlClfsProvider::earlyInitialize(Plugin::Target &target)
+{
+ MessageStorePlugin *store = dynamic_cast<MessageStorePlugin *>(&target);
+ if (store) {
+ // Check the store dir option; if not specified, need to
+ // grab the broker's data dir.
+ if (options.storeDir.empty()) {
+ const DataDir& dir = store->getBroker()->getDataDir();
+ if (dir.isEnabled()) {
+ options.storeDir = dir.getPath();
+ }
+ else {
+ QPID_LOG(error,
+ "MSSQL-CLFS: --store-dir required if --no-data-dir specified");
+ return;
+ }
+ }
+
+ // If CLFS is not available on this system, give up now.
+ try {
+ Log::TuningParameters params;
+ params.containerSize = options.containerSize;
+ params.containers = options.initialContainers;
+ params.shrinkPct = 50;
+ params.maxWriteBuffers = options.maxWriteBuffers;
+ std::string msgPath = options.storeDir + "\\" + "messages";
+ messages.openLog(msgPath, params);
+ std::string transPath = options.storeDir + "\\" + "transactions";
+ transactions->open(transPath, params);
+ }
+ catch (std::exception &e) {
+ QPID_LOG(error, e.what());
+ return;
+ }
+
+ // If the database init fails, report it and don't register; give
+ // the rest of the broker a chance to run.
+ //
+ // Don't try to initConnection() since that will fail if the
+ // database doesn't exist. Instead, try to open a connection without
+ // a database name, then search for the database. There's still a
+ // chance this provider won't be selected for the store too, so be
+ // be sure to close the database connection before return to avoid
+ // leaving a connection up that will not be used.
+ try {
+ initState(); // This initializes COM
+ std::auto_ptr<DatabaseConnection> db(new DatabaseConnection());
+ db->open(options.connectString, "");
+ _ConnectionPtr conn(*db);
+ _RecordsetPtr pCatalogs = NULL;
+ VariantHelper<std::string> catalogName(options.catalogName);
+ pCatalogs = conn->OpenSchema(adSchemaCatalogs, catalogName);
+ if (pCatalogs->EndOfFile) {
+ // Database doesn't exist; create it
+ QPID_LOG(notice,
+ "MSSQL-CLFS: Creating database " + options.catalogName);
+ createDb(db.get(), options.catalogName);
+ }
+ else {
+ QPID_LOG(notice,
+ "MSSQL-CLFS: Database located: " + options.catalogName);
+ }
+ if (pCatalogs) {
+ if (pCatalogs->State == adStateOpen)
+ pCatalogs->Close();
+ pCatalogs = 0;
+ }
+ db->close();
+ store->providerAvailable("MSSQL-CLFS", this);
+ }
+ catch (qpid::Exception &e) {
+ QPID_LOG(error, e.what());
+ return;
+ }
+ store->addFinalizer(boost::bind(&MSSqlClfsProvider::finalizeMe, this));
+ }
+}
+
+void
+MSSqlClfsProvider::initialize(Plugin::Target& target)
+{
+}
+
+void
+MSSqlClfsProvider::activate(MessageStorePlugin &store)
+{
+ QPID_LOG(info, "MS SQL/CLFS Provider is up");
+}
+
+void
+MSSqlClfsProvider::create(PersistableQueue& queue,
+ const qpid::framing::FieldTable& /*args needed for jrnl*/)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ try {
+ db->beginTransaction();
+ rsQueues.open(db, TblQueue);
+ rsQueues.add(queue);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error creating queue " + queue.getName(), e, errs);
+ }
+ catch(std::exception& e) {
+ db->rollbackTransaction();
+ THROW_STORE_EXCEPTION(e.what());
+ }
+}
+
+/**
+ * Destroy a durable queue
+ */
+void
+MSSqlClfsProvider::destroy(PersistableQueue& queue)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsQueues.open(db, TblQueue);
+ rsBindings.open(db, TblBinding);
+ // Remove bindings first; the queue IDs can't be ripped out from
+ // under the references in the bindings table.
+ rsBindings.removeForQueue(queue.getPersistenceId());
+ rsQueues.remove(queue);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting queue " + queue.getName(), e, errs);
+ }
+
+ /*
+ * Now that the SQL stuff has recorded the queue deletion, expunge
+ * all record of the queue from the messages set. Any errors logging
+ * these removals are swallowed because during a recovery the queue
+ * Id won't be present (the SQL stuff already committed) so any references
+ * to it in message operations will be removed.
+ */
+ messages.expunge(queue.getPersistenceId());
+}
+
+/**
+ * Record the existence of a durable exchange
+ */
+void
+MSSqlClfsProvider::create(const PersistableExchange& exchange,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ try {
+ db->beginTransaction();
+ rsExchanges.open(db, TblExchange);
+ rsExchanges.add(exchange);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error creating exchange " + exchange.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Destroy a durable exchange
+ */
+void
+MSSqlClfsProvider::destroy(const PersistableExchange& exchange)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsExchanges.open(db, TblExchange);
+ rsBindings.open(db, TblBinding);
+ // Remove bindings first; the exchange IDs can't be ripped out from
+ // under the references in the bindings table.
+ rsBindings.removeForExchange(exchange.getPersistenceId());
+ rsExchanges.remove(exchange);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting exchange " + exchange.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Record a binding
+ */
+void
+MSSqlClfsProvider::bind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsBindings.open(db, TblBinding);
+ rsBindings.add(exchange.getPersistenceId(),
+ queue.getPersistenceId(),
+ key,
+ args);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error binding exchange " + exchange.getName() +
+ " to queue " + queue.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Forget a binding
+ */
+void
+MSSqlClfsProvider::unbind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsBindings.open(db, TblBinding);
+ rsBindings.remove(exchange.getPersistenceId(),
+ queue.getPersistenceId(),
+ key,
+ args);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error unbinding exchange " + exchange.getName() +
+ " from queue " + queue.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Record generic durable configuration
+ */
+void
+MSSqlClfsProvider::create(const PersistableConfig& config)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
+ try {
+ db->beginTransaction();
+ rsConfigs.open(db, TblConfig);
+ rsConfigs.add(config);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error creating config " + config.getName(), e, errs);
+ }
+}
+
+/**
+ * Destroy generic durable configuration
+ */
+void
+MSSqlClfsProvider::destroy(const PersistableConfig& config)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
+ try {
+ db->beginTransaction();
+ rsConfigs.open(db, TblConfig);
+ rsConfigs.remove(config);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting config " + config.getName(), e, errs);
+ }
+}
+
+/**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+void
+MSSqlClfsProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
+{
+#if 0
+ DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
+ try {
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.add(msg);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error staging message", e, errs);
+ }
+#endif
+}
+
+/**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+void
+MSSqlClfsProvider::destroy(PersistableMessage& msg)
+{
+#if 0
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsMessages;
+ try {
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.remove(msg);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting message", e, errs);
+ }
+#endif
+}
+
+/**
+ * Appends content to a previously staged message
+ */
+void
+MSSqlClfsProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data)
+{
+#if 0
+ DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
+ try {
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.append(msg, data);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error appending to message", e, errs);
+ }
+#endif
+}
+
+/**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+void
+MSSqlClfsProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
+ const boost::intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+ // Message log keeps all messages in one log, so we don't need the
+ // queue reference.
+ messages.loadContent(msg->getPersistenceId(), data, offset, length);
+}
+
+/**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * @param ctxt The transaction context under which this enqueue happens.
+ * @param msg The message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ */
+void
+MSSqlClfsProvider::enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
+{
+ Transaction::shared_ptr t;
+ TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt);
+ if (ctx)
+ t = ctx->getTransaction();
+ else {
+ TPCTransactionContext *tctx;
+ tctx = dynamic_cast<TPCTransactionContext*>(ctxt);
+ if (tctx)
+ t = tctx->getTransaction();
+ }
+ uint64_t msgId = msg->getPersistenceId();
+ if (msgId == 0) {
+ messages.add(msg);
+ msgId = msg->getPersistenceId();
+ }
+ messages.enqueue(msgId, queue.getPersistenceId(), t);
+ msg->enqueueComplete();
+}
+
+/**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * @param ctxt The transaction context under which this dequeue happens.
+ * @param msg The message to dequeue
+ * @param queue The queue from which it is to be dequeued
+ */
+void
+MSSqlClfsProvider::dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
+{
+ Transaction::shared_ptr t;
+ TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt);
+ if (ctx)
+ t = ctx->getTransaction();
+ else {
+ TPCTransactionContext *tctx;
+ tctx = dynamic_cast<TPCTransactionContext*>(ctxt);
+ if (tctx)
+ t = tctx->getTransaction();
+ }
+ messages.dequeue(msg->getPersistenceId(), queue.getPersistenceId(), t);
+ msg->dequeueComplete();
+}
+
+std::auto_ptr<qpid::broker::TransactionContext>
+MSSqlClfsProvider::begin()
+{
+ Transaction::shared_ptr t = transactions->begin();
+ std::auto_ptr<qpid::broker::TransactionContext> tc(new TransactionContext(t));
+ return tc;
+}
+
+std::auto_ptr<qpid::broker::TPCTransactionContext>
+MSSqlClfsProvider::begin(const std::string& xid)
+{
+ TPCTransaction::shared_ptr t = transactions->begin(xid);
+ std::auto_ptr<qpid::broker::TPCTransactionContext> tc(new TPCTransactionContext(t));
+ return tc;
+}
+
+void
+MSSqlClfsProvider::prepare(qpid::broker::TPCTransactionContext& txn)
+{
+ TPCTransactionContext *ctx = dynamic_cast<TPCTransactionContext*> (&txn);
+ if (ctx == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ ctx->getTransaction()->prepare();
+}
+
+void
+MSSqlClfsProvider::commit(qpid::broker::TransactionContext& txn)
+{
+ Transaction::shared_ptr t;
+ TransactionContext *ctx = dynamic_cast<TransactionContext*>(&txn);
+ if (ctx)
+ t = ctx->getTransaction();
+ else {
+ TPCTransactionContext *tctx;
+ tctx = dynamic_cast<TPCTransactionContext*>(&txn);
+ if (tctx == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ t = tctx->getTransaction();
+ }
+ t->commit(messages);
+}
+
+void
+MSSqlClfsProvider::abort(qpid::broker::TransactionContext& txn)
+{
+ Transaction::shared_ptr t;
+ TransactionContext *ctx = dynamic_cast<TransactionContext*>(&txn);
+ if (ctx)
+ t = ctx->getTransaction();
+ else {
+ TPCTransactionContext *tctx;
+ tctx = dynamic_cast<TPCTransactionContext*>(&txn);
+ if (tctx == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ t = tctx->getTransaction();
+ }
+ t->abort(messages);
+}
+
+void
+MSSqlClfsProvider::collectPreparedXids(std::set<std::string>& xids)
+{
+ std::map<std::string, TPCTransaction::shared_ptr> preparedMap;
+ transactions->collectPreparedXids(preparedMap);
+ std::map<std::string, TPCTransaction::shared_ptr>::const_iterator i;
+ for (i = preparedMap.begin(); i != preparedMap.end(); ++i) {
+ xids.insert(i->first);
+ }
+}
+
+// @TODO Much of this recovery code is way too similar... refactor to
+// a recover template method on BlobRecordset.
+
+void
+MSSqlClfsProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
+ rsConfigs.open(db, TblConfig);
+ _RecordsetPtr p = (_RecordsetPtr)rsConfigs;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Config instance and reset its ID.
+ broker::RecoverableConfig::shared_ptr config =
+ recoverer.recoverConfig(blob);
+ config->setPersistenceId(id);
+ p->MoveNext();
+ }
+}
+
+void
+MSSqlClfsProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+ ExchangeMap& exchangeMap)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ rsExchanges.open(db, TblExchange);
+ _RecordsetPtr p = (_RecordsetPtr)rsExchanges;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Exchange instance, reset its ID, and remember the
+ // ones restored for matching up when recovering bindings.
+ broker::RecoverableExchange::shared_ptr exchange =
+ recoverer.recoverExchange(blob);
+ exchange->setPersistenceId(id);
+ exchangeMap[id] = exchange;
+ p->MoveNext();
+ }
+}
+
+void
+MSSqlClfsProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer,
+ QueueMap& queueMap)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ rsQueues.open(db, TblQueue);
+ _RecordsetPtr p = (_RecordsetPtr)rsQueues;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Queue instance and reset its ID.
+ broker::RecoverableQueue::shared_ptr queue =
+ recoverer.recoverQueue(blob);
+ queue->setPersistenceId(id);
+ queueMap[id] = queue;
+ p->MoveNext();
+ }
+}
+
+void
+MSSqlClfsProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer,
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap)
+{
+ DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
+ rsBindings.open(db, TblBinding);
+ rsBindings.recover(recoverer, exchangeMap, queueMap);
+}
+
+void
+MSSqlClfsProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer,
+ MessageMap& messageMap,
+ MessageQueueMap& messageQueueMap)
+{
+ // Read the list of valid queue Ids to ensure that no broken msg->queue
+ // refs get restored.
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ rsQueues.open(db, TblQueue);
+ _RecordsetPtr p = (_RecordsetPtr)rsQueues;
+ std::set<uint64_t> validQueues;
+ if (!(p->BOF && p->EndOfFile)) {
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ validQueues.insert(id);
+ p->MoveNext();
+ }
+ }
+ std::map<uint64_t, Transaction::shared_ptr> transMap;
+ transactions->recover(transMap);
+ messages.recover(recoverer,
+ validQueues,
+ transMap,
+ messageMap,
+ messageQueueMap);
+}
+
+void
+MSSqlClfsProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap)
+{
+ std::map<std::string, TPCTransaction::shared_ptr> preparedMap;
+ transactions->collectPreparedXids(preparedMap);
+ std::map<std::string, TPCTransaction::shared_ptr>::const_iterator i;
+ for (i = preparedMap.begin(); i != preparedMap.end(); ++i) {
+ std::auto_ptr<TPCTransactionContext> ctx(new TPCTransactionContext(i->second));
+ std::auto_ptr<qpid::broker::TPCTransactionContext> brokerCtx(ctx);
+ dtxMap[i->first] = recoverer.recoverTransaction(i->first, brokerCtx);
+ }
+}
+
+////////////// Internal Methods
+
+State *
+MSSqlClfsProvider::initState()
+{
+ State *state = dbState.get(); // See if thread has initialized
+ if (!state) {
+ state = new State;
+ dbState.reset(state);
+ }
+ return state;
+}
+
+DatabaseConnection *
+MSSqlClfsProvider::initConnection(void)
+{
+ State *state = initState();
+ if (state->dbConn != 0)
+ return state->dbConn; // And the DatabaseConnection is set up too
+ std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+ db->open(options.connectString, options.catalogName);
+ state->dbConn = db.release();
+ return state->dbConn;
+}
+
+void
+MSSqlClfsProvider::createDb(DatabaseConnection *db, const std::string &name)
+{
+ const std::string dbCmd = "CREATE DATABASE " + name;
+ const std::string useCmd = "USE " + name;
+ const std::string tableCmd = "CREATE TABLE ";
+ const std::string colSpecs =
+ " (persistenceId bigint PRIMARY KEY NOT NULL IDENTITY(1,1),"
+ " fieldTableBlob varbinary(MAX) NOT NULL)";
+ const std::string bindingSpecs =
+ " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL,"
+ " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
+ " routingKey varchar(255),"
+ " fieldTableBlob varbinary(MAX))";
+
+ _variant_t unused;
+ _bstr_t dbStr = dbCmd.c_str();
+ _ConnectionPtr conn(*db);
+ try {
+ conn->Execute(dbStr, &unused, adExecuteNoRecords);
+ _bstr_t useStr = useCmd.c_str();
+ conn->Execute(useStr, &unused, adExecuteNoRecords);
+ std::string makeTable = tableCmd + TblQueue + colSpecs;
+ _bstr_t makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblExchange + colSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblConfig + colSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblBinding + bindingSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ }
+ catch(_com_error &e) {
+ throw ADOException("MSSQL can't create " + name, e, db->getErrors());
+ }
+}
+
+void
+MSSqlClfsProvider::dump()
+{
+ // dump all db records to qpid_log
+ QPID_LOG(notice, "DB Dump: (not dumping anything)");
+ // rsQueues.dump();
+}
+
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
new file mode 100644
index 0000000000..849a0a44e8
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
@@ -0,0 +1,406 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <windows.h>
+#include <clfsw32.h>
+#include <exception>
+#include <malloc.h>
+#include <memory.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/IntegerTypes.h>
+#include <qpid/sys/windows/check.h>
+
+#include "MessageLog.h"
+#include "Lsn.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+namespace {
+
+// Structures that hold log records. Each has a type field at the start.
+enum MessageEntryType {
+ MessageStartEntry = 1,
+ MessageChunkEntry = 2,
+ MessageDeleteEntry = 3,
+ MessageEnqueueEntry = 4,
+ MessageDequeueEntry = 5
+};
+static const uint32_t MaxMessageContentLength = 64 * 1024;
+
+// Message-Start
+struct MessageStart {
+ MessageEntryType type;
+ // If the complete message encoding doesn't fit, remainder is in
+ // MessageChunk records to follow.
+ // headerLength is the size of the message's header in content. It is
+ // part of the totalLength and the segmentLength.
+ uint32_t headerLength;
+ uint32_t totalLength;
+ uint32_t segmentLength;
+ char content[MaxMessageContentLength];
+
+ MessageStart()
+ : type(MessageStartEntry),
+ headerLength(0),
+ totalLength(0),
+ segmentLength(0) {}
+};
+// Message-Chunk
+struct MessageChunk {
+ MessageEntryType type;
+ uint32_t segmentLength;
+ char content[MaxMessageContentLength];
+
+ MessageChunk() : type(MessageChunkEntry), segmentLength(0) {}
+};
+// Message-Delete
+struct MessageDelete {
+ MessageEntryType type;
+
+ MessageDelete() : type(MessageDeleteEntry) {}
+};
+// Message-Enqueue
+struct MessageEnqueue {
+ MessageEntryType type;
+ uint64_t queueId;
+ uint64_t transId;
+
+ MessageEnqueue(uint64_t qId = 0, uint64_t tId = 0)
+ : type(MessageEnqueueEntry), queueId(qId), transId(tId) {}
+};
+// Message-Dequeue
+struct MessageDequeue {
+ MessageEntryType type;
+ uint64_t queueId;
+ uint64_t transId;
+
+ MessageDequeue(uint64_t qId = 0, uint64_t tId = 0)
+ : type(MessageDequeueEntry), queueId(qId), transId(tId) {}
+};
+
+} // namespace
+
+void
+MessageLog::initialize()
+{
+ // Write something to occupy the first record, preventing a real message
+ // from being lsn/id 0. Delete of a non-existant id is easily tossed
+ // during recovery if no other messages have caused the tail to be moved
+ // up past this dummy record by then.
+ deleteMessage(0, 0);
+}
+
+uint32_t
+MessageLog::marshallingBufferSize()
+{
+ size_t biggestNeed = std::max(sizeof(MessageStart), sizeof(MessageEnqueue));
+ uint32_t defSize = static_cast<uint32_t>(biggestNeed);
+ uint32_t minSize = Log::marshallingBufferSize();
+ if (defSize <= minSize)
+ return minSize;
+ // Round up to multiple of minSize
+ return (defSize + minSize) / minSize * minSize;
+}
+
+uint64_t
+MessageLog::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+{
+ // The message may be too long to fit in one record; if so, write
+ // Message-Chunk records to contain the rest. If it does all fit in one
+ // record, though, optimize the encoding by going straight to the
+ // Message-Start record rather than encoding then copying to the record.
+ // In all case
+ MessageStart entry;
+ uint32_t encodedMessageLength = msg->encodedSize();
+ entry.headerLength = msg->encodedHeaderSize();
+ entry.totalLength = encodedMessageLength;
+ CLFS_LSN location, lastChunkLsn;
+ std::auto_ptr<char> encodeStage;
+ char *encodeBuff = 0;
+ bool oneRecord = encodedMessageLength <= MaxMessageContentLength;
+ if (oneRecord) {
+ encodeBuff = entry.content;
+ entry.segmentLength = encodedMessageLength;
+ }
+ else {
+ encodeStage.reset(new char[encodedMessageLength]);
+ encodeBuff = encodeStage.get();
+ entry.segmentLength = MaxMessageContentLength;
+ }
+ qpid::framing::Buffer buff(encodeBuff, encodedMessageLength);
+ msg->encode(buff);
+ if (!oneRecord)
+ memcpy_s(entry.content, sizeof(entry.content),
+ encodeBuff, entry.segmentLength);
+ uint32_t entryLength = static_cast<uint32_t>(sizeof(entry));
+ entryLength -= (MaxMessageContentLength - entry.segmentLength);
+ location = write(&entry, entryLength);
+ // Write any Message-Chunk records before setting the message's id.
+ uint32_t sent = entry.segmentLength;
+ uint32_t remaining = encodedMessageLength - entry.segmentLength;
+ while (remaining > 0) {
+ MessageChunk chunk;
+ chunk.segmentLength = std::max(MaxMessageContentLength, remaining);
+ memcpy_s(chunk.content, sizeof(chunk.content),
+ encodeStage.get() + sent, chunk.segmentLength);
+ entryLength = static_cast<uint32_t>(sizeof(chunk));
+ entryLength -= (MaxMessageContentLength - chunk.segmentLength);
+ lastChunkLsn = write(&chunk, entryLength, &location);
+ sent += chunk.segmentLength;
+ remaining -= chunk.segmentLength;
+ }
+ return lsnToId(location);
+}
+
+void
+MessageLog::deleteMessage(uint64_t messageId, uint64_t newFirstId)
+{
+ MessageDelete deleteEntry;
+ CLFS_LSN msgLsn = idToLsn(messageId);
+ write(&deleteEntry, sizeof(deleteEntry), &msgLsn);
+ if (newFirstId != 0)
+ moveTail(idToLsn(newFirstId));
+}
+
+// Load part or all of a message's content from previously stored
+// log record(s).
+void
+MessageLog::loadContent(uint64_t messageId,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+}
+
+void
+MessageLog::recordEnqueue (uint64_t messageId,
+ uint64_t queueId,
+ uint64_t transactionId)
+{
+ MessageEnqueue entry(queueId, transactionId);
+ CLFS_LSN msgLsn = idToLsn(messageId);
+ write(&entry, sizeof(entry), &msgLsn);
+}
+
+void
+MessageLog::recordDequeue (uint64_t messageId,
+ uint64_t queueId,
+ uint64_t transactionId)
+{
+ MessageDequeue entry(queueId, transactionId);
+ CLFS_LSN msgLsn = idToLsn(messageId);
+ write(&entry, sizeof(entry), &msgLsn);
+}
+
+void
+MessageLog::recover(qpid::broker::RecoveryManager& recoverer,
+ qpid::store::MessageMap& messageMap,
+ std::map<uint64_t, std::vector<RecoveredMsgOp> >& messageOps)
+{
+ // If context and content needs to be saved while reassembling messages
+ // split across log records, save the info and reassembly buffer.
+ struct MessageBlocks {
+ uint32_t totalLength;
+ uint32_t soFarLength;
+ boost::shared_ptr<char> content;
+
+ MessageBlocks() : totalLength(0), soFarLength(0), content((char*)0) {}
+ };
+ std::map<uint64_t, MessageBlocks> reassemblies;
+ std::map<uint64_t, MessageBlocks>::iterator at;
+
+ QPID_LOG(debug, "Recovering message log");
+
+ // Note that there may be message refs in the log which are deleted, so
+ // be sure to only add msgs at message-start record, and ignore those
+ // that don't have an existing message record.
+ // Get the base LSN - that's how to say "start reading at the beginning"
+ CLFS_INFORMATION info;
+ ULONG infoLength = sizeof (info);
+ BOOL ok = ::GetLogFileInformation(handle, &info, &infoLength);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ // Pointers for the various record types that can be assigned in the
+ // reading loop below.
+ MessageStart *start;
+ MessageChunk *chunk;
+ MessageEnqueue *enqueue;
+ MessageDequeue *dequeue;
+
+ qpid::store::MessageMap::iterator messageMapSpot;
+ qpid::store::MessageQueueMap::iterator queueMapSpot;
+ PVOID recordPointer;
+ ULONG recordLength;
+ CLFS_RECORD_TYPE recordType = ClfsDataRecord;
+ CLFS_LSN messageLsn, current, undoNext;
+ PVOID readContext;
+ uint64_t msgId;
+ // Note 'current' in case it's needed below; ReadNextLogRecord returns it
+ // via a parameter.
+ current = info.BaseLsn;
+ ok = ::ReadLogRecord(marshal,
+ &info.BaseLsn,
+ ClfsContextForward,
+ &recordPointer,
+ &recordLength,
+ &recordType,
+ &undoNext,
+ &messageLsn,
+ &readContext,
+ 0);
+ while (ok) {
+ // All the record types this class writes have a MessageEntryType in the
+ // beginning. Based on that, do what's needed.
+ MessageEntryType *t =
+ reinterpret_cast<MessageEntryType *>(recordPointer);
+ switch(*t) {
+ case MessageStartEntry:
+ start = reinterpret_cast<MessageStart *>(recordPointer);
+ msgId = lsnToId(current);
+ QPID_LOG(debug, "Message Start, id " << msgId);
+ // If the message content is split across multiple log records, save
+ // this content off to the side until the remaining record(s) are
+ // located.
+ if (start->totalLength == start->segmentLength) { // Whole thing
+ // Start by recovering the header then see if the rest of
+ // the content is desired.
+ qpid::framing::Buffer buff(start->content, start->headerLength);
+ qpid::broker::RecoverableMessage::shared_ptr m =
+ recoverer.recoverMessage(buff);
+ m->setPersistenceId(msgId);
+ messageMap[msgId] = m;
+ uint32_t contentLength =
+ start->totalLength - start->headerLength;
+ if (m->loadContent(contentLength)) {
+ qpid::framing::Buffer content(&(start->content[start->headerLength]),
+ contentLength);
+ m->decodeContent(content);
+ }
+ }
+ else {
+ // Save it in a block big enough.
+ MessageBlocks b;
+ b.totalLength = start->totalLength;
+ b.soFarLength = start->segmentLength;
+ b.content.reset(new char[b.totalLength]);
+ memcpy_s(b.content.get(), b.totalLength,
+ start->content, start->segmentLength);
+ reassemblies[msgId] = b;
+ }
+ break;
+ case MessageChunkEntry:
+ chunk = reinterpret_cast<MessageChunk *>(recordPointer);
+ // Remember, all entries chained to MessageStart via previous.
+ msgId = lsnToId(messageLsn);
+ QPID_LOG(debug, "Message Chunk for id " << msgId);
+ at = reassemblies.find(msgId);
+ if (at == reassemblies.end()) {
+ QPID_LOG(debug, "Message frag for " << msgId <<
+ " but no start; discarded");
+ }
+ else {
+ MessageBlocks *b = &(at->second);
+ if (b->soFarLength + chunk->segmentLength > b->totalLength)
+ throw std::runtime_error("Invalid message chunk length");
+ memcpy_s(b->content.get() + b->soFarLength,
+ b->totalLength - b->soFarLength,
+ chunk->content,
+ chunk->segmentLength);
+ b->soFarLength += chunk->segmentLength;
+ if (b->totalLength == b->soFarLength) {
+ qpid::framing::Buffer buff(b->content.get(),
+ b->totalLength);
+ qpid::broker::RecoverableMessage::shared_ptr m =
+ recoverer.recoverMessage(buff);
+ m->setPersistenceId(msgId);
+ messageMap[msgId] = m;
+ reassemblies.erase(at);
+ }
+ }
+ break;
+ case MessageDeleteEntry:
+ msgId = lsnToId(messageLsn);
+ QPID_LOG(debug, "Message Delete, id " << msgId);
+ messageMap.erase(msgId);
+ messageOps.erase(msgId);
+ break;
+ case MessageEnqueueEntry:
+ enqueue = reinterpret_cast<MessageEnqueue *>(recordPointer);
+ msgId = lsnToId(messageLsn);
+ QPID_LOG(debug, "Message " << msgId << " Enqueue on queue " <<
+ enqueue->queueId << ", txn " << enqueue->transId);
+ if (messageMap.find(msgId) == messageMap.end()) {
+ QPID_LOG(debug,
+ "Message " << msgId << " doesn't exist; discarded");
+ }
+ else {
+ std::vector<RecoveredMsgOp>& ops = messageOps[msgId];
+ RecoveredMsgOp op(RECOVERED_ENQUEUE,
+ enqueue->queueId,
+ enqueue->transId);
+ ops.push_back(op);
+ }
+ break;
+ case MessageDequeueEntry:
+ dequeue = reinterpret_cast<MessageDequeue *>(recordPointer);
+ msgId = lsnToId(messageLsn);
+ QPID_LOG(debug, "Message " << msgId << " Dequeue from queue " <<
+ dequeue->queueId);
+ if (messageMap.find(msgId) == messageMap.end()) {
+ QPID_LOG(debug,
+ "Message " << msgId << " doesn't exist; discarded");
+ }
+ else {
+ std::vector<RecoveredMsgOp>& ops = messageOps[msgId];
+ RecoveredMsgOp op(RECOVERED_DEQUEUE,
+ dequeue->queueId,
+ dequeue->transId);
+ ops.push_back(op);
+ }
+ break;
+ default:
+ throw std::runtime_error("Bad message log entry type");
+ }
+
+ recordType = ClfsDataRecord;
+ ok = ::ReadNextLogRecord(readContext,
+ &recordPointer,
+ &recordLength,
+ &recordType,
+ 0, // No userLsn
+ &undoNext,
+ &messageLsn,
+ &current,
+ 0);
+ }
+ DWORD status = ::GetLastError();
+ ::TerminateReadLog(readContext);
+ if (status == ERROR_HANDLE_EOF) { // No more records
+ QPID_LOG(debug, "Message log recovered");
+ return;
+ }
+ throw QPID_WINDOWS_ERROR(status);
+}
+
+}}} // namespace qpid::store::ms_clfs
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h
new file mode 100644
index 0000000000..b3705287a6
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h
@@ -0,0 +1,107 @@
+#ifndef QPID_STORE_MSCLFS_MESSAGELOG_H
+#define QPID_STORE_MSCLFS_MESSAGELOG_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/intrusive_ptr.hpp>
+#include <qpid/broker/PersistableMessage.h>
+#include <qpid/broker/RecoveryManager.h>
+#include <qpid/sys/IntegerTypes.h>
+#include <qpid/store/StorageProvider.h>
+
+#include "Log.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+/**
+ * @class MessageLog
+ *
+ * Represents a CLFS-housed message log.
+ */
+class MessageLog : public Log {
+
+protected:
+ // Message log needs to have a no-op first record written in the log
+ // to ensure that no real message gets an ID 0.
+ virtual void initialize();
+
+public:
+ // Inherited and reimplemented from Log. Figure the minimum marshalling
+ // buffer size needed for the records this class writes.
+ virtual uint32_t marshallingBufferSize();
+
+ // Add the specified message to the log; Return the persistence Id.
+ uint64_t add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
+ // Write a Delete entry for messageId. If newFirstId is not 0, it is now
+ // the earliest valid message in the log, so move the tail up to it.
+ void deleteMessage(uint64_t messageId, uint64_t newFirstId);
+
+ // Load part or all of a message's content from previously stored
+ // log record(s).
+ void loadContent(uint64_t messageId,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
+ // Enqueue and dequeue operations track messages' transit across
+ // queues; each operation may be associated with a transaction. If
+ // the transactionId is 0 the operation is not associated with a
+ // transaction.
+ void recordEnqueue (uint64_t messageId,
+ uint64_t queueId,
+ uint64_t transactionId);
+ void recordDequeue (uint64_t messageId,
+ uint64_t queueId,
+ uint64_t transactionId);
+
+ // Recover the messages and their queueing records from the log.
+ // @param recoverer Recovery manager used to recreate broker objects from
+ // encoded framing buffers recovered from the log.
+ // @param messageMap This method fills in the map of id -> ptr of
+ // recovered messages.
+ // @param messageOps This method fills in the map of msg id ->
+ // vector of operations involving the message that were
+ // recovered from the log. It is the caller's
+ // responsibility to sort the operations out and
+ // ascertain which operations should be acted on. The
+ // order of operations in the vector is as they were
+ // read in order from the log.
+ typedef enum { RECOVERED_ENQUEUE = 1, RECOVERED_DEQUEUE } RecoveredOpType;
+ struct RecoveredMsgOp {
+ RecoveredOpType op;
+ uint64_t queueId;
+ uint64_t txnId;
+
+ RecoveredMsgOp(RecoveredOpType o, const uint64_t& q, const uint64_t& t)
+ : op(o), queueId(q), txnId(t) {}
+ };
+ void recover(qpid::broker::RecoveryManager& recoverer,
+ qpid::store::MessageMap& messageMap,
+ std::map<uint64_t, std::vector<RecoveredMsgOp> >& messageOps);
+};
+
+}}} // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_MESSAGELOG_H */
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
new file mode 100644
index 0000000000..db5d2ebf4c
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
@@ -0,0 +1,472 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/log/Statement.h>
+
+#include "Messages.h"
+#include "Lsn.h"
+#include "qpid/store/StoreException.h"
+#include <boost/foreach.hpp>
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+void
+Messages::openLog(const std::string& path, const Log::TuningParameters& params)
+{
+ log.open (path, params);
+}
+
+void
+Messages::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+{
+ uint64_t id = log.add(msg);
+ msg->setPersistenceId(id);
+ std::auto_ptr<MessageInfo> autom(new MessageInfo);
+ MessageInfo::shared_ptr m(autom);
+ std::pair<uint64_t, MessageInfo::shared_ptr> p(id, m);
+ {
+ qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock);
+ messages.insert(p);
+ // If there's only this one message there, move the tail to it.
+ // This prevents the log from continually growing when messages
+ // are added and removed one at a time.
+ if (messages.size() == 1) {
+ CLFS_LSN newTail = idToLsn(id);
+ log.moveTail(newTail);
+ }
+ }
+}
+
+void
+Messages::enqueue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t)
+{
+ MessageInfo::shared_ptr p;
+ {
+ qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
+ MessageMap::const_iterator i = messages.find(msgId);
+ if (i == messages.end())
+ THROW_STORE_EXCEPTION("Message does not exist");
+ p = i->second;
+ }
+ MessageInfo::Location loc(queueId, t, MessageInfo::TRANSACTION_ENQUEUE);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+ p->where.push_back(loc);
+ uint64_t transactionId = 0;
+ if (t.get() != 0) {
+ transactionId = t->getId();
+ t->enroll(msgId);
+ }
+ try {
+ log.recordEnqueue(msgId, queueId, transactionId);
+ }
+ catch (...) {
+ // Undo the record-keeping if the log wasn't written correctly.
+ if (transactionId != 0)
+ t->unenroll(msgId);
+ p->where.pop_back();
+ throw;
+ }
+ }
+}
+
+void
+Messages::dequeue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t)
+{
+ MessageInfo::shared_ptr p;
+ {
+ qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
+ MessageMap::const_iterator i = messages.find(msgId);
+ if (i == messages.end())
+ THROW_STORE_EXCEPTION("Message does not exist");
+ p = i->second;
+ }
+ {
+ // Locate the 'where' entry for the specified queue. Once this operation
+ // is recorded in the log, update the 'where' entry to reflect it.
+ // Note that an existing entry in 'where' that refers to a transaction
+ // is not eligible for this operation.
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+ std::list<MessageInfo::Location>::iterator i;
+ for (i = p->where.begin(); i != p->where.end(); ++i) {
+ if (i->queueId == queueId && i->transaction.get() == 0)
+ break;
+ }
+ if (i == p->where.end())
+ THROW_STORE_EXCEPTION("Message not on queue");
+ uint64_t transactionId = 0;
+ if (t.get() != 0) {
+ transactionId = t->getId();
+ t->enroll(msgId);
+ }
+ try {
+ log.recordDequeue(msgId, queueId, transactionId);
+ }
+ catch (...) {
+ // Undo the record-keeping if the log wasn't written correctly.
+ if (transactionId != 0)
+ t->unenroll(msgId);
+ throw;
+ }
+ // Ok, logged successfully. If this is a transactional op, note
+ // the transaction. If non-transactional, remove the 'where' entry.
+ if (transactionId != 0) {
+ i->transaction = t;
+ i->disposition = MessageInfo::TRANSACTION_DEQUEUE;
+ }
+ else {
+ p->where.erase(i);
+ // If the message doesn't exist on any other queues, remove it.
+ if (p->where.empty())
+ remove(msgId);
+ }
+ }
+}
+
+// Commit a previous provisional enqueue or dequeue of a particular message
+// actions under a specified transaction. If this results in the message's
+// being removed from all queues, it is deleted.
+void
+Messages::commit(uint64_t msgId, Transaction::shared_ptr& t)
+{
+ MessageInfo::shared_ptr p;
+ {
+ qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
+ MessageMap::const_iterator i = messages.find(msgId);
+ if (i == messages.end())
+ THROW_STORE_EXCEPTION("Message does not exist");
+ p = i->second;
+ }
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+ std::list<MessageInfo::Location>::iterator i;
+ for (i = p->where.begin(); i != p->where.end(); ++i) {
+ if (i->transaction != t)
+ continue;
+ // Transactional dequeues can now remove the item from the
+ // where list; enqueues just clear the transaction reference.
+ if (i->disposition == MessageInfo::TRANSACTION_DEQUEUE)
+ i = p->where.erase(i);
+ else
+ i->transaction.reset();
+ }
+ }
+ // If committing results in this message having no further enqueue
+ // references, delete it. If the delete fails, swallow the exception
+ // and let recovery take care of removing it later.
+ if (p->where.empty()) {
+ try {
+ remove(msgId);
+ }
+ catch(...) {}
+ }
+}
+
+// Abort a previous provisional enqueue or dequeue of a particular message
+// actions under a specified transaction. If this results in the message's
+// being removed from all queues, it is deleted.
+void
+Messages::abort(uint64_t msgId, Transaction::shared_ptr& t)
+{
+ MessageInfo::shared_ptr p;
+ {
+ qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
+ MessageMap::const_iterator i = messages.find(msgId);
+ if (i == messages.end())
+ THROW_STORE_EXCEPTION("Message does not exist");
+ p = i->second;
+ }
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+ std::list<MessageInfo::Location>::iterator i = p->where.begin();
+ while (i != p->where.end()) {
+ if (i->transaction != t) {
+ ++i;
+ continue;
+ }
+ // Aborted transactional dequeues result in the message remaining
+ // enqueued like before the operation; enqueues clear the
+ // message from the where list - like the enqueue never happened.
+ if (i->disposition == MessageInfo::TRANSACTION_ENQUEUE)
+ i = p->where.erase(i);
+ else {
+ i->transaction.reset();
+ ++i;
+ }
+ }
+ }
+ // If aborting results in this message having no further enqueue
+ // references, delete it. If the delete fails, swallow the exception
+ // and let recovery take care of removing it later.
+ if (p->where.empty()) {
+ try {
+ remove(msgId);
+ }
+ catch(...) {}
+ }
+}
+
+// Load part or all of a message's content from previously stored
+// log record(s).
+void
+Messages::loadContent(uint64_t msgId,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+ log.loadContent(msgId, data, offset, length);
+}
+
+// Recover the current set of messages and where they're queued from
+// the log.
+void
+Messages::recover(qpid::broker::RecoveryManager& recoverer,
+ const std::set<uint64_t> &validQueues,
+ const std::map<uint64_t, Transaction::shared_ptr>& transMap,
+ qpid::store::MessageMap& messageMap,
+ qpid::store::MessageQueueMap& messageQueueMap)
+{
+ std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> > messageOps;
+ log.recover(recoverer, messageMap, messageOps);
+ // Now read through the messageOps replaying the operations with the
+ // knowledge of which transactions committed, aborted, etc. A transaction
+ // should not be deleted until there are no messages referencing it so
+ // a message operation with a transaction id not found in transMap is
+ // a serious problem.
+ QPID_LOG(debug, "Beginning CLFS-recovered message operation replay");
+ // Keep track of any messages that are recovered from the log but don't
+ // have any place to be. This can happen, for example, if the broker
+ // crashes while logging a message deletion. After all the recovery is
+ // done, delete all the homeless messages.
+ std::vector<uint64_t> homeless;
+ std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> >::const_iterator msg;
+ for (msg = messageOps.begin(); msg != messageOps.end(); ++msg) {
+ uint64_t msgId = msg->first;
+ const std::vector<MessageLog::RecoveredMsgOp>& ops = msg->second;
+ QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " op(s)");
+ MessageInfo::shared_ptr m(new MessageInfo);
+ std::vector<QueueEntry>& entries = messageQueueMap[msgId];
+ std::vector<MessageLog::RecoveredMsgOp>::const_iterator op;
+ for (op = ops.begin(); op != ops.end(); ++op) {
+ QueueEntry entry(op->queueId);
+ MessageInfo::Location loc(op->queueId);
+ std::string dir =
+ op->op == MessageLog::RECOVERED_ENQUEUE ? "enqueue"
+ : "dequeue";
+ if (validQueues.find(op->queueId) == validQueues.end()) {
+ QPID_LOG(info,
+ "Message " << msgId << dir << " on non-existant queue "
+ << op->queueId << "; dropped");
+ continue;
+ }
+ if (op->txnId != 0) {
+ // Be sure to enroll this message in the transaction even if
+ // it has committed or aborted. This ensures that the
+ // transaction isn't removed from the log while finalizing the
+ // recovery. If it were to be removed and the broker failed
+ // again before removing this message during normal operation,
+ // it couldn't be recovered again.
+ //
+ // Recall what is being reconstructed; 2 things:
+ // 1. This class's 'messages' list which keeps track
+ // of the queues each message is on and the transactions
+ // each message is enrolled in. For this, aborted
+ // transactions cause the result of the operation to be
+ // ignored, but the message does need to be enrolled in
+ // the transaction to properly maintain the transaction
+ // references until the message is deleted.
+ // 2. The StorageProvider's MessageQueueMap, which also
+ // has an entry for each queue each message is on and
+ // its TPL status and associated xid.
+ const Transaction::shared_ptr &t =
+ transMap.find(op->txnId)->second;
+ // Prepared transactions cause the operation to be
+ // provisionally acted on, and the message to be enrolled in
+ // the transaction for when it commits/aborts. This is
+ // noted in the QueueEntry for the StorageProvider's map.
+ if (t->getState() == Transaction::TRANS_PREPARED) {
+ QPID_LOG(debug, dir << " for queue " << op->queueId <<
+ ", prepared txn " << op->txnId);
+ TPCTransaction::shared_ptr tpct(boost::dynamic_pointer_cast<TPCTransaction>(t));
+ if (tpct.get() == 0)
+ THROW_STORE_EXCEPTION("Invalid transaction state");
+ t->enroll(msgId);
+ entry.xid = tpct->getXid();
+ loc.transaction = t;
+ if (op->op == MessageLog::RECOVERED_ENQUEUE) {
+ entry.tplStatus = QueueEntry::ADDING;
+ loc.disposition = MessageInfo::TRANSACTION_ENQUEUE;
+ }
+ else {
+ entry.tplStatus = QueueEntry::REMOVING;
+ loc.disposition = MessageInfo::TRANSACTION_DEQUEUE;
+ }
+ }
+ else if (t->getState() != Transaction::TRANS_COMMITTED) {
+ QPID_LOG(debug, dir << " for queue " << op->queueId <<
+ ", txn " << op->txnId << ", rolling back");
+ continue;
+ }
+ }
+ // Here for non-transactional and prepared transactional operations
+ // to set up the messageQueueMap entries. Note that at this point
+ // a committed transactional operation looks like a
+ // non-transactional one as far as the QueueEntry is
+ // concerned - just do it. If this is an entry enqueuing a
+ // message, just add it to the entries list. If it's a dequeue
+ // operation, locate the matching entry for the queue and delete
+ // it if the current op is non-transactional; if it's a prepared
+ // transaction then replace the existing entry with the current
+ // one that notes the message is enqueued but being removed under
+ // a prepared transaction.
+ QPID_LOG(debug, dir + " at queue " << entry.queueId);
+ if (op->op == MessageLog::RECOVERED_ENQUEUE) {
+ entries.push_back(entry);
+ m->where.push_back(loc);
+ }
+ else {
+ std::vector<QueueEntry>::iterator i = entries.begin();
+ while (i != entries.end()) {
+ if (i->queueId == entry.queueId) {
+ if (entry.tplStatus != QueueEntry::NONE)
+ *i = entry;
+ else
+ entries.erase(i);
+ break;
+ }
+ ++i;
+ }
+ std::list<MessageInfo::Location>::iterator w = m->where.begin();
+ while (w != m->where.end()) {
+ if (w->queueId == loc.queueId) {
+ if (loc.transaction.get() != 0) {
+ *w = loc;
+ ++w;
+ }
+ else {
+ w = m->where.erase(w);
+ }
+ }
+ }
+ }
+ }
+ // Now that all the queue entries have been set correctly, see if
+ // there are any entries; they may have all been removed during
+ // recovery. If there are none, add this message to the homeless
+ // list to be deleted from the log after the recovery is done.
+ if (m->where.size() == 0) {
+ homeless.push_back(msgId);
+ messageMap.erase(msgId);
+ messageQueueMap.erase(msgId);
+ }
+ else {
+ std::pair<uint64_t, MessageInfo::shared_ptr> p(msgId, m);
+ messages.insert(p);
+ }
+ }
+
+ QPID_LOG(debug, "Message log recovery done.");
+ // Done! Ok, go back and delete all the homeless messages.
+ BOOST_FOREACH(uint64_t msg, homeless) {
+ QPID_LOG(debug, "Deleting homeless message " << msg);
+ remove(msg);
+ }
+}
+
+// Expunge is called when a queue is deleted. All references to that
+// queue must be expunged from all messages. 'Dequeue' log records are
+// written for each queue entry removed, but any errors are swallowed.
+// On recovery there's a list of valid queues passed in. The deleted
+// queue will not be on that list so if any references to it are
+// recovered they'll get weeded out then.
+void
+Messages::expunge(uint64_t queueId)
+{
+ std::vector<uint64_t> toBeDeleted; // Messages to be deleted later.
+
+ {
+ // Lock everybody out since all messages are possibly in play.
+ // There also may be other threads already working on particular
+ // messages so individual message mutex still must be acquired.
+ qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock);
+ MessageMap::iterator m;
+ for (m = messages.begin(); m != messages.end(); ++m) {
+ MessageInfo::shared_ptr p = m->second;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> ml(p->whereLock);
+ std::list<MessageInfo::Location>::iterator i = p->where.begin();
+ while (i != p->where.end()) {
+ if (i->queueId != queueId) {
+ ++i;
+ continue;
+ }
+ // If this entry is involved in a transaction, unenroll it.
+ // Then remove the entry.
+ if (i->transaction.get() != 0)
+ i->transaction->unenroll(m->first);
+ i = p->where.erase(i);
+ try {
+ log.recordDequeue(m->first, queueId, 0);
+ }
+ catch(...) {
+ }
+ }
+ if (p->where.size() == 0)
+ toBeDeleted.push_back(m->first);
+ }
+ }
+ }
+ // Swallow any exceptions during this; don't care. Recover it later
+ // if needed.
+ try {
+ BOOST_FOREACH(uint64_t msg, toBeDeleted)
+ remove(msg);
+ }
+ catch(...) {
+ }
+}
+
+// Remove a specified message from those controlled by this object.
+void
+Messages::remove(uint64_t messageId)
+{
+ uint64_t newFirstId = 0;
+ {
+ qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock);
+ messages.erase(messageId);
+ // May have deleted the first entry; if so the log can release that.
+ // If this message being deleted results in an empty list of
+ // messages, move the tail up to this message's LSN. This may
+ // result in one or more messages being stranded in the log
+ // until there's more activity. If a restart happens while these
+ // unneeded log records are there, the presence of the MessageDelete
+ // entry will cause the message(s) to be ignored anyway.
+ if (messages.empty())
+ newFirstId = messageId;
+ else if (messages.begin()->first > messageId)
+ newFirstId = messages.begin()->first;
+ }
+ log.deleteMessage(messageId, newFirstId);
+}
+
+}}} // namespace qpid::store::ms_clfs
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Messages.h b/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
new file mode 100644
index 0000000000..93cc8bfe62
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
@@ -0,0 +1,144 @@
+#ifndef QPID_STORE_MSCLFS_MESSAGES_H
+#define QPID_STORE_MSCLFS_MESSAGES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <windows.h>
+#include <map>
+#include <set>
+#include <vector>
+#include <boost/intrusive_ptr.hpp>
+#include <qpid/broker/PersistableMessage.h>
+#include <qpid/sys/Mutex.h>
+
+#include "MessageLog.h"
+#include "Transaction.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+class Messages {
+
+ struct MessageInfo {
+ // How many queues this message is on, whether actually (non-transacted)
+ // or provisionally (included in a non-yet-committed transaction).
+ volatile LONG enqueuedCount;
+
+ // Keep a list of transactional operations this message is
+ // referenced in. When the transaction changes/finalizes these all
+ // need to be acted on.
+ typedef enum { TRANSACTION_NONE = 0,
+ TRANSACTION_ENQUEUE,
+ TRANSACTION_DEQUEUE } TransType;
+#if 0
+ std::map<Transaction::shared_ptr, std::vector<TransType> > transOps;
+ qpid::sys::Mutex transOpsLock;
+#endif
+ // Think what I need is a list of "where is this message" - queue id,
+ // transaction ref, what kind of trans op (enq/deq). Then "remove all
+ // queue refs" can search through all messages looking for queue ids
+ // and undo them. Write "remove from queue" record to log. Also need to
+ // add "remove from queue" to recovery.
+ struct Location {
+ uint64_t queueId;
+ Transaction::shared_ptr transaction;
+ TransType disposition;
+
+ Location(uint64_t q)
+ : queueId(q), transaction(), disposition(TRANSACTION_NONE) {}
+ Location(uint64_t q, Transaction::shared_ptr& t, TransType d)
+ : queueId(q), transaction(t), disposition(d) {}
+ };
+ qpid::sys::Mutex whereLock;
+ std::list<Location> where;
+ // The transactions vector just keeps a shared_ptr to each
+ // Transaction this message was involved in, regardless of the
+ // disposition or transaction state. Keeping a valid shared_ptr
+ // prevents the Transaction from being deleted. As long as there
+ // are any messages that referred to a transaction, that
+ // transaction's state needs to be known so the message disposition
+ // can be correctly recovered if needed.
+ std::vector<Transaction::shared_ptr> transactions;
+
+ typedef boost::shared_ptr<MessageInfo> shared_ptr;
+
+ MessageInfo() : enqueuedCount(0) {}
+ };
+
+ qpid::sys::RWlock lock;
+ typedef std::map<uint64_t, MessageInfo::shared_ptr> MessageMap;
+ MessageMap messages;
+ MessageLog log;
+
+ // Remove a specified message from those controlled by this object.
+ void remove(uint64_t messageId);
+
+public:
+ void openLog(const std::string& path, const Log::TuningParameters& params);
+
+ // Add the specified message to the log and list of known messages.
+ // Upon successful return the message's persistenceId is set.
+ void add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
+ // Add the specified queue to the message's list of places it is
+ // enqueued.
+ void enqueue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t);
+
+ // Remove the specified queue from the message's list of places it is
+ // enqueued. If there are no other queues holding the message, it is
+ // deleted.
+ void dequeue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t);
+
+ // Commit a previous provisional enqueue or dequeue of a particular message
+ // actions under a specified transaction. If this results in the message's
+ // being removed from all queues, it is deleted.
+ void commit(uint64_t msgId, Transaction::shared_ptr& transaction);
+
+ // Abort a previous provisional enqueue or dequeue of a particular message
+ // actions under a specified transaction. If this results in the message's
+ // being removed from all queues, it is deleted.
+ void abort(uint64_t msgId, Transaction::shared_ptr& transaction);
+
+ // Load part or all of a message's content from previously stored
+ // log record(s).
+ void loadContent(uint64_t msgId,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
+ // Expunge is called when a queue is deleted. All references to that
+ // queue must be expunged from all messages.
+ void expunge(uint64_t queueId);
+
+ // Recover the current set of messages and where they're queued from
+ // the log.
+ void recover(qpid::broker::RecoveryManager& recoverer,
+ const std::set<uint64_t> &validQueues,
+ const std::map<uint64_t, Transaction::shared_ptr>& transMap,
+ qpid::store::MessageMap& messageMap,
+ qpid::store::MessageQueueMap& messageQueueMap);
+};
+
+}}} // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_MESSAGES_H */
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp
new file mode 100644
index 0000000000..f94fef6f84
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Transaction.h"
+#include "Messages.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+Transaction::~Transaction()
+{
+ // Transactions that are recovered then found to be deleted get destroyed
+ // but need not be logged.
+ if (state != TRANS_DELETED)
+ log->deleteTransaction(id);
+}
+
+void
+Transaction::enroll(uint64_t msgId)
+{
+ qpid::sys::ScopedWlock<qpid::sys::RWlock> l(enrollLock);
+ enrolledMessages.push_back(msgId);
+}
+
+void
+Transaction::unenroll(uint64_t msgId)
+{
+ qpid::sys::ScopedWlock<qpid::sys::RWlock> l(enrollLock);
+ for (std::vector<uint64_t>::iterator i = enrolledMessages.begin();
+ i < enrolledMessages.end();
+ ++i) {
+ if (*i == msgId) {
+ enrolledMessages.erase(i);
+ break;
+ }
+ }
+}
+
+void
+Transaction::abort(Messages& messages)
+{
+ log->recordAbort(id);
+ for (size_t i = 0; i < enrolledMessages.size(); ++i)
+ messages.abort(enrolledMessages[i], shared_from_this());
+ state = TRANS_ABORTED;
+}
+
+void
+Transaction::commit(Messages& messages)
+{
+ log->recordCommit(id);
+ for (size_t i = 0; i < enrolledMessages.size(); ++i)
+ messages.commit(enrolledMessages[i], shared_from_this());
+ state = TRANS_COMMITTED;
+}
+
+void
+TPCTransaction::prepare(void)
+{
+ log->recordPrepare(id);
+ state = TRANS_PREPARED;
+}
+
+}}} // namespace qpid::store::ms_clfs
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h
new file mode 100644
index 0000000000..499b01d503
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h
@@ -0,0 +1,147 @@
+#ifndef QPID_STORE_MSCLFS_TRANSACTION_H
+#define QPID_STORE_MSCLFS_TRANSACTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/TransactionalStore.h>
+#include <qpid/sys/Mutex.h>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/shared_ptr.hpp>
+#include <string>
+#include <vector>
+
+#include "TransactionLog.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+class Messages;
+
+/**
+ * @class Transaction
+ *
+ * Class representing an AMQP transaction. This is used around a set of
+ * enqueue and dequeue operations that occur when the broker is acting
+ * on a transaction commit/abort from the client.
+ * This class is what the store uses internally to implement things a
+ * transaction needs; the broker knows about TransactionContext, which
+ * holds a pointer to Transaction.
+ *
+ * NOTE: All references to Transactions (and TPCTransactions, below) are
+ * through Boost shared_ptr instances. All messages enrolled in a transaction
+ * hold a shared_ptr. Thus, a Transaction object will not be deleted until all
+ * messages holding a reference to it are deleted. This fact is also used
+ * during recovery to automatically clean up and delete any Transaction without
+ * messages left referring to it.
+ */
+class Transaction : public boost::enable_shared_from_this<Transaction> {
+private:
+ // TransactionLog has to create all Transaction instances.
+ Transaction() {}
+
+public:
+
+ typedef boost::shared_ptr<Transaction> shared_ptr;
+ typedef enum { TRANS_OPEN = 1,
+ TRANS_PREPARED,
+ TRANS_ABORTED,
+ TRANS_COMMITTED,
+ TRANS_DELETED } State;
+
+ virtual ~Transaction();
+
+ uint64_t getId() { return id; }
+ State getState() { return state; }
+
+ void enroll(uint64_t msgId);
+ void unenroll(uint64_t msgId); // For failed ops, not normal end-of-trans
+
+ void abort(Messages& messages);
+ void commit(Messages& messages);
+
+protected:
+ friend class TransactionLog;
+ Transaction(uint64_t _id, const TransactionLog::shared_ptr& _log)
+ : id(_id), state(TRANS_OPEN), log(_log) {}
+
+ uint64_t id;
+ State state;
+ TransactionLog::shared_ptr log;
+ std::vector<uint64_t> enrolledMessages;
+ qpid::sys::RWlock enrollLock;
+};
+
+class TransactionContext : public qpid::broker::TransactionContext {
+ Transaction::shared_ptr transaction;
+
+public:
+ TransactionContext(const Transaction::shared_ptr& _transaction)
+ : transaction(_transaction) {}
+
+ virtual Transaction::shared_ptr& getTransaction() { return transaction; }
+};
+
+/**
+ * @class TPCTransaction
+ *
+ * Class representing a Two-Phase-Commit (TPC) AMQP transaction. This is
+ * used around a set of enqueue and dequeue operations that occur when the
+ * broker is acting on a transaction prepare/commit/abort from the client.
+ * This class is what the store uses internally to implement things a
+ * transaction needs; the broker knows about TPCTransactionContext, which
+ * holds a pointer to TPCTransaction.
+ */
+class TPCTransaction : public Transaction {
+
+ friend class TransactionLog;
+ TPCTransaction(uint64_t _id,
+ const TransactionLog::shared_ptr& _log,
+ const std::string& _xid)
+ : Transaction(_id, _log), xid(_xid) {}
+
+ std::string xid;
+
+public:
+ typedef boost::shared_ptr<TPCTransaction> shared_ptr;
+
+ virtual ~TPCTransaction() {}
+
+ void prepare();
+ bool isPrepared() const { return state == TRANS_PREPARED; }
+
+ const std::string& getXid(void) const { return xid; }
+};
+
+class TPCTransactionContext : public qpid::broker::TPCTransactionContext {
+ TPCTransaction::shared_ptr transaction;
+
+public:
+ TPCTransactionContext(const TPCTransaction::shared_ptr& _transaction)
+ : transaction(_transaction) {}
+
+ virtual TPCTransaction::shared_ptr& getTransaction() { return transaction; }
+};
+
+}}} // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_TRANSACTION_H */
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
new file mode 100644
index 0000000000..0ef046d7c8
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
@@ -0,0 +1,428 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <windows.h>
+#include <clfsw32.h>
+#include <exception>
+#include <malloc.h>
+#include <memory.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/IntegerTypes.h>
+#include <qpid/sys/windows/check.h>
+
+#include "TransactionLog.h"
+#include "Transaction.h"
+#include "Lsn.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+namespace {
+
+// Structures that hold log records. Each has a type field at the start.
+enum TransactionEntryType {
+ TransactionStartDtxEntry = 1,
+ TransactionStartTxEntry = 2,
+ TransactionPrepareEntry = 3,
+ TransactionCommitEntry = 4,
+ TransactionAbortEntry = 5,
+ TransactionDeleteEntry = 6
+};
+// The only thing that really takes up space in transaction records is the
+// xid. Max xid length is in the neighborhood of 600 bytes. Leave some room.
+static const uint32_t MaxTransactionContentLength = 1024;
+
+// Dtx-Start
+struct TransactionStartDtx {
+ TransactionEntryType type;
+ uint32_t length;
+ char content[MaxTransactionContentLength];
+
+ TransactionStartDtx()
+ : type(TransactionStartDtxEntry), length(0) {}
+};
+// Tx-Start
+struct TransactionStartTx {
+ TransactionEntryType type;
+
+ TransactionStartTx()
+ : type(TransactionStartTxEntry) {}
+};
+// Prepare
+struct TransactionPrepare {
+ TransactionEntryType type;
+
+ TransactionPrepare()
+ : type(TransactionPrepareEntry) {}
+};
+// Commit
+struct TransactionCommit {
+ TransactionEntryType type;
+
+ TransactionCommit()
+ : type(TransactionCommitEntry) {}
+};
+// Abort
+struct TransactionAbort {
+ TransactionEntryType type;
+
+ TransactionAbort()
+ : type(TransactionAbortEntry) {}
+};
+// Delete
+struct TransactionDelete {
+ TransactionEntryType type;
+
+ TransactionDelete()
+ : type(TransactionDeleteEntry) {}
+};
+
+} // namespace
+
+void
+TransactionLog::initialize()
+{
+ // Write something to occupy the first record, preventing a real
+ // transaction from being lsn/id 0. Delete of a non-existant id is easily
+ // tossed during recovery if no other transactions have caused the tail
+ // to be moved up past this dummy record by then.
+ deleteTransaction(0);
+}
+
+uint32_t
+TransactionLog::marshallingBufferSize()
+{
+ size_t biggestNeed = sizeof(TransactionStartDtx);
+ uint32_t defSize = static_cast<uint32_t>(biggestNeed);
+ uint32_t minSize = Log::marshallingBufferSize();
+ if (defSize <= minSize)
+ return minSize;
+ // Round up to multiple of minSize
+ return (defSize + minSize) / minSize * minSize;
+}
+
+// Get a new Transaction
+boost::shared_ptr<Transaction>
+TransactionLog::begin()
+{
+ TransactionStartTx entry;
+ CLFS_LSN location;
+ uint64_t id;
+ uint32_t entryLength = static_cast<uint32_t>(sizeof(entry));
+ location = write(&entry, entryLength);
+ try {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ id = lsnToId(location);
+ std::auto_ptr<Transaction> t(new Transaction(id, shared_from_this()));
+ boost::shared_ptr<Transaction> t2(t);
+ boost::weak_ptr<Transaction> weak_t2(t2);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds[id] = weak_t2;
+ }
+ return t2;
+ }
+ catch(...) {
+ deleteTransaction(id);
+ throw;
+ }
+}
+
+// Get a new TPCTransaction
+boost::shared_ptr<TPCTransaction>
+TransactionLog::begin(const std::string& xid)
+{
+ TransactionStartDtx entry;
+ CLFS_LSN location;
+ uint64_t id;
+ uint32_t entryLength = static_cast<uint32_t>(sizeof(entry));
+ entry.length = static_cast<uint32_t>(xid.length());
+ memcpy_s(entry.content, sizeof(entry.content),
+ xid.c_str(), xid.length());
+ entryLength -= (sizeof(entry.content) - entry.length);
+ location = write(&entry, entryLength);
+ try {
+ id = lsnToId(location);
+ std::auto_ptr<TPCTransaction> t(new TPCTransaction(id,
+ shared_from_this(),
+ xid));
+ boost::shared_ptr<TPCTransaction> t2(t);
+ boost::weak_ptr<Transaction> weak_t2(t2);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds[id] = weak_t2;
+ }
+ return t2;
+ }
+ catch(...) {
+ deleteTransaction(id);
+ throw;
+ }
+}
+
+void
+TransactionLog::recordPrepare(uint64_t transId)
+{
+ TransactionPrepare entry;
+ CLFS_LSN transLsn = idToLsn(transId);
+ write(&entry, sizeof(entry), &transLsn);
+}
+
+void
+TransactionLog::recordCommit(uint64_t transId)
+{
+ TransactionCommit entry;
+ CLFS_LSN transLsn = idToLsn(transId);
+ write(&entry, sizeof(entry), &transLsn);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds[transId].reset();
+ }
+}
+
+void
+TransactionLog::recordAbort(uint64_t transId)
+{
+ TransactionAbort entry;
+ CLFS_LSN transLsn = idToLsn(transId);
+ write(&entry, sizeof(entry), &transLsn);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds[transId].reset();
+ }
+}
+
+void
+TransactionLog::deleteTransaction(uint64_t transId)
+{
+ uint64_t newFirstId = 0;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds.erase(transId);
+ // May have deleted the first entry; if so the log can release that.
+ // If this deletion results in an empty list of transactions,
+ // move the tail up to this transaction's LSN. This may result in
+ // one or more transactions being stranded in the log until there's
+ // more activity. If a restart happens while these unneeded log
+ // records are there, the presence of the TransactionDelete
+ // entry will cause them to be ignored anyway.
+ if (validIds.empty())
+ newFirstId = transId;
+ else if (validIds.begin()->first > transId)
+ newFirstId = validIds.begin()->first;
+ }
+ TransactionDelete deleteEntry;
+ CLFS_LSN transLsn = idToLsn(transId);
+ write(&deleteEntry, sizeof(deleteEntry), &transLsn);
+ if (newFirstId != 0)
+ moveTail(idToLsn(newFirstId));
+}
+
+void
+TransactionLog::collectPreparedXids(std::map<std::string, TPCTransaction::shared_ptr>& preparedMap)
+{
+ // Go through all the known transactions; if the transaction is still
+ // valid (open or prepared) it will have weak_ptr to the Transaction.
+ // If it can be downcast and has a state of TRANS_PREPARED, add to the map.
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ std::map<uint64_t, boost::weak_ptr<Transaction> >::const_iterator i;
+ for (i = validIds.begin(); i != validIds.end(); ++i) {
+ Transaction::shared_ptr t = i->second.lock();
+ if (t.get() == 0)
+ continue;
+ TPCTransaction::shared_ptr tpct(boost::dynamic_pointer_cast<TPCTransaction>(t));
+ if (tpct.get() == 0)
+ continue;
+ if (tpct->state == Transaction::TRANS_PREPARED)
+ preparedMap[tpct->getXid()] = tpct;
+ }
+}
+
+void
+TransactionLog::recover(std::map<uint64_t, Transaction::shared_ptr>& transMap)
+{
+ QPID_LOG(debug, "Recovering transaction log");
+
+ // Note that there may be transaction refs in the log which are deleted,
+ // so be sure to only add transactions at Start records, and ignore those
+ // that don't have an existing message record.
+ // Get the base LSN - that's how to say "start reading at the beginning"
+ CLFS_INFORMATION info;
+ ULONG infoLength = sizeof (info);
+ BOOL ok = ::GetLogFileInformation(handle, &info, &infoLength);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ // Pointers for the various record types that can be assigned in the
+ // reading loop below.
+ TransactionStartDtx *startDtxEntry;
+ TransactionStartTx *startTxEntry;
+
+ PVOID recordPointer;
+ ULONG recordLength;
+ CLFS_RECORD_TYPE recordType = ClfsDataRecord;
+ CLFS_LSN transLsn, current, undoNext;
+ PVOID readContext;
+ uint64_t transId;
+ // Note 'current' in case it's needed below; ReadNextLogRecord returns it
+ // via a parameter.
+ current = info.BaseLsn;
+ ok = ::ReadLogRecord(marshal,
+ &info.BaseLsn,
+ ClfsContextForward,
+ &recordPointer,
+ &recordLength,
+ &recordType,
+ &undoNext,
+ &transLsn,
+ &readContext,
+ 0);
+
+ std::auto_ptr<Transaction> tPtr;
+ std::auto_ptr<TPCTransaction> tpcPtr;
+ while (ok) {
+ std::string xid;
+
+ // All the record types this class writes have a TransactionEntryType
+ // in the beginning. Based on that, do what's needed.
+ TransactionEntryType *t =
+ reinterpret_cast<TransactionEntryType *>(recordPointer);
+ switch(*t) {
+ case TransactionStartDtxEntry:
+ startDtxEntry =
+ reinterpret_cast<TransactionStartDtx *>(recordPointer);
+ transId = lsnToId(current);
+ QPID_LOG(debug, "Dtx start, id " << transId);
+ xid.assign(startDtxEntry->content, startDtxEntry->length);
+ tpcPtr.reset(new TPCTransaction(transId, shared_from_this(), xid));
+ transMap[transId] = boost::shared_ptr<TPCTransaction>(tpcPtr);
+ break;
+ case TransactionStartTxEntry:
+ startTxEntry =
+ reinterpret_cast<TransactionStartTx *>(recordPointer);
+ transId = lsnToId(current);
+ QPID_LOG(debug, "Tx start, id " << transId);
+ tPtr.reset(new Transaction(transId, shared_from_this()));
+ transMap[transId] = boost::shared_ptr<Transaction>(tPtr);
+ break;
+ case TransactionPrepareEntry:
+ transId = lsnToId(transLsn);
+ QPID_LOG(debug, "Dtx prepare, id " << transId);
+ if (transMap.find(transId) == transMap.end()) {
+ QPID_LOG(debug,
+ "Dtx " << transId << " doesn't exist; discarded");
+ }
+ else {
+ transMap[transId]->state = Transaction::TRANS_PREPARED;
+ }
+ break;
+ case TransactionCommitEntry:
+ transId = lsnToId(transLsn);
+ QPID_LOG(debug, "Txn commit, id " << transId);
+ if (transMap.find(transId) == transMap.end()) {
+ QPID_LOG(debug,
+ "Txn " << transId << " doesn't exist; discarded");
+ }
+ else {
+ transMap[transId]->state = Transaction::TRANS_COMMITTED;
+ }
+ break;
+ case TransactionAbortEntry:
+ transId = lsnToId(transLsn);
+ QPID_LOG(debug, "Txn abort, id " << transId);
+ if (transMap.find(transId) == transMap.end()) {
+ QPID_LOG(debug,
+ "Txn " << transId << " doesn't exist; discarded");
+ }
+ else {
+ transMap[transId]->state = Transaction::TRANS_ABORTED;
+ }
+ break;
+ case TransactionDeleteEntry:
+ transId = lsnToId(transLsn);
+ QPID_LOG(debug, "Txn delete, id " << transId);
+ if (transMap.find(transId) == transMap.end()) {
+ QPID_LOG(debug,
+ "Txn " << transId << " doesn't exist; discarded");
+ }
+ else {
+ transMap[transId]->state = Transaction::TRANS_DELETED;
+ transMap.erase(transId);
+ }
+ break;
+ default:
+ throw std::runtime_error("Bad transaction log entry type");
+ }
+
+ recordType = ClfsDataRecord;
+ ok = ::ReadNextLogRecord(readContext,
+ &recordPointer,
+ &recordLength,
+ &recordType,
+ 0, // No userLsn
+ &undoNext,
+ &transLsn,
+ &current,
+ 0);
+ }
+ DWORD status = ::GetLastError();
+ ::TerminateReadLog(readContext);
+ if (status != ERROR_HANDLE_EOF) // No more records
+ throw QPID_WINDOWS_ERROR(status);
+
+ QPID_LOG(debug, "Transaction log recovered");
+
+ // At this point we have a list of all the not-deleted transactions that
+ // were in existence when the broker last ran. All transactions of both
+ // Dtx and Tx types that haven't prepared or committed will be aborted.
+ // This will give the proper background against which to decide each
+ // message's disposition when recovering messages that were involved in
+ // transactions.
+ // In addition to recovering and aborting transactions, rebuild the
+ // validIds map now that we know which ids are really valid.
+ std::map<uint64_t, Transaction::shared_ptr>::const_iterator i;
+ for (i = transMap.begin(); i != transMap.end(); ++i) {
+ switch(i->second->state) {
+ case Transaction::TRANS_OPEN:
+ QPID_LOG(debug, "Txn " << i->first << " was open; aborted");
+ i->second->state = Transaction::TRANS_ABORTED;
+ break;
+ case Transaction::TRANS_ABORTED:
+ QPID_LOG(debug, "Txn " << i->first << " was aborted");
+ break;
+ case Transaction::TRANS_COMMITTED:
+ QPID_LOG(debug, "Txn " << i->first << " was committed");
+ break;
+ case Transaction::TRANS_PREPARED:
+ QPID_LOG(debug, "Txn " << i->first << " was prepared");
+ break;
+ case Transaction::TRANS_DELETED:
+ QPID_LOG(error,
+ "Txn " << i->first << " was deleted; shouldn't be here");
+ break;
+ }
+ boost::weak_ptr<Transaction> weak_txn(i->second);
+ validIds[i->first] = weak_txn;
+ }
+}
+
+}}} // namespace qpid::store::ms_clfs
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h
new file mode 100644
index 0000000000..7ca27c229e
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h
@@ -0,0 +1,104 @@
+#ifndef QPID_STORE_MSCLFS_TRANSACTIONLOG_H
+#define QPID_STORE_MSCLFS_TRANSACTIONLOG_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <set>
+
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include <qpid/broker/RecoveryManager.h>
+#include <qpid/sys/IntegerTypes.h>
+#include <qpid/sys/Mutex.h>
+
+#include "Log.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+class Transaction;
+class TPCTransaction;
+
+/**
+ * @class TransactionLog
+ *
+ * Represents a CLFS-housed transaction log.
+ */
+class TransactionLog : public Log,
+ public boost::enable_shared_from_this<TransactionLog> {
+
+ // To know when it's ok to move the log tail the lowest valid Id must
+ // always be known. Keep track of valid Ids here. These are transactions
+ // which have not yet been Deleted in the log. They may be new, in progress,
+ // prepared, committed, or aborted - but not deleted.
+ // Entries corresponding to not-yet-finalized transactions (i.e., open or
+ // prepared) also have a weak_ptr so the Transaction can be accessed.
+ // This is primarily to check its state and get a list of prepared Xids.
+ std::map<uint64_t, boost::weak_ptr<Transaction> > validIds;
+ qpid::sys::Mutex idsLock;
+
+protected:
+ // Transaction log needs to have a no-op first record written in the log
+ // to ensure that no real transaction gets an ID 0; messages think trans
+ // id 0 means "no transaction."
+ virtual void initialize();
+
+public:
+ // Inherited and reimplemented from Log. Figure the minimum marshalling
+ // buffer size needed for the records this class writes.
+ virtual uint32_t marshallingBufferSize();
+
+ typedef boost::shared_ptr<TransactionLog> shared_ptr;
+
+ // Get a new Transaction
+ boost::shared_ptr<Transaction> begin();
+
+ // Get a new TPCTransaction
+ boost::shared_ptr<TPCTransaction> begin(const std::string& xid);
+
+ void recordPrepare(uint64_t transId);
+ void recordCommit(uint64_t transId);
+ void recordAbort(uint64_t transId);
+ void deleteTransaction(uint64_t transId);
+
+ // Fill @arg preparedMap with Xid->TPCTransaction::shared_ptr for all
+ // currently prepared transactions.
+ void collectPreparedXids(std::map<std::string, boost::shared_ptr<TPCTransaction> >& preparedMap);
+
+ // Recover the transactions and their state from the log.
+ // Every non-deleted transaction recovered from the log will be
+ // represented in @arg transMap. The recovering messages can use this
+ // information to tell if a transaction referred to in an enqueue/dequeue
+ // operation should be recovered or dropped by examining transaction state.
+ //
+ // @param recoverer Recovery manager used to recreate broker objects from
+ // entries recovered from the log.
+ // @param transMap This method fills in the map of id -> shared_ptr of
+ // recovered transactions.
+ void recover(std::map<uint64_t, boost::shared_ptr<Transaction> >& transMap);
+};
+
+}}} // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_TRANSACTIONLOG_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp
new file mode 100644
index 0000000000..095d1bf331
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AmqpTransaction.h"
+#include "DatabaseConnection.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+AmqpTransaction::AmqpTransaction(const boost::shared_ptr<DatabaseConnection>& _db)
+ : db(_db), sqlTrans(_db)
+{
+}
+
+AmqpTransaction::~AmqpTransaction()
+{
+}
+
+void
+AmqpTransaction::sqlBegin()
+{
+ sqlTrans.begin();
+}
+
+void
+AmqpTransaction::sqlCommit()
+{
+ sqlTrans.commit();
+}
+
+void
+AmqpTransaction::sqlAbort()
+{
+ sqlTrans.abort();
+}
+
+
+AmqpTPCTransaction::AmqpTPCTransaction(const boost::shared_ptr<DatabaseConnection>& db,
+ const std::string& _xid)
+ : AmqpTransaction(db), prepared(false), xid(_xid)
+{
+}
+
+AmqpTPCTransaction::~AmqpTPCTransaction()
+{
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h
new file mode 100644
index 0000000000..625fab5595
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h
@@ -0,0 +1,85 @@
+#ifndef QPID_STORE_MSSQL_AMQPTRANSACTION_H
+#define QPID_STORE_MSSQL_AMQPTRANSACTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/TransactionalStore.h>
+#include <boost/shared_ptr.hpp>
+#include <string>
+
+#include "SqlTransaction.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @class AmqpTransaction
+ *
+ * Class representing an AMQP transaction. This is used around a set of
+ * enqueue and dequeue operations that occur when the broker is acting
+ * on a transaction commit/abort from the client.
+ */
+class AmqpTransaction : public qpid::broker::TransactionContext {
+
+ boost::shared_ptr<DatabaseConnection> db;
+ SqlTransaction sqlTrans;
+
+public:
+ AmqpTransaction(const boost::shared_ptr<DatabaseConnection>& _db);
+ virtual ~AmqpTransaction();
+
+ DatabaseConnection *dbConn() { return db.get(); }
+
+ void sqlBegin();
+ void sqlCommit();
+ void sqlAbort();
+};
+
+/**
+ * @class AmqpTPCTransaction
+ *
+ * Class representing a Two-Phase-Commit (TPC) AMQP transaction. This is
+ * used around a set of enqueue and dequeue operations that occur when the
+ * broker is acting on a transaction prepare/commit/abort from the client.
+ */
+class AmqpTPCTransaction : public AmqpTransaction,
+ public qpid::broker::TPCTransactionContext {
+ bool prepared;
+ std::string xid;
+
+public:
+ AmqpTPCTransaction(const boost::shared_ptr<DatabaseConnection>& db,
+ const std::string& _xid);
+ virtual ~AmqpTPCTransaction();
+
+ void setPrepared(void) { prepared = true; }
+ bool isPrepared(void) const { return prepared; }
+
+ const std::string& getXid(void) const { return xid; }
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_AMQPTRANSACTION_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
new file mode 100644
index 0000000000..1dc4370312
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
@@ -0,0 +1,165 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "BindingRecordset.h"
+#include "BlobAdapter.h"
+#include "BlobEncoder.h"
+#include "VariantHelper.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+BindingRecordset::removeFilter(const std::string& filter)
+{
+ rs->PutFilter (VariantHelper<std::string>(filter));
+ long recs = rs->GetRecordCount();
+ if (recs == 0)
+ return; // Nothing to do
+ while (recs > 0) {
+ // Deleting adAffectAll doesn't work as documented; go one by one.
+ rs->Delete(adAffectCurrent);
+ if (--recs > 0)
+ rs->MoveNext();
+ }
+ rs->Update();
+}
+
+void
+BindingRecordset::add(uint64_t exchangeId,
+ uint64_t queueId,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable& args)
+{
+ VariantHelper<std::string> routingKeyStr(routingKey);
+ BlobEncoder blob (args); // Marshall field table to a blob
+ rs->AddNew();
+ rs->Fields->GetItem("exchangeId")->Value = exchangeId;
+ rs->Fields->GetItem("queueId")->Value = queueId;
+ rs->Fields->GetItem("routingKey")->Value = routingKeyStr;
+ rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+ rs->Update();
+}
+
+void
+BindingRecordset::remove(uint64_t exchangeId,
+ uint64_t queueId,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable& /*args*/)
+{
+ // Look up the affected binding.
+ std::ostringstream filter;
+ filter << "exchangeId = " << exchangeId
+ << " AND queueId = " << queueId
+ << " AND routingKey = '" << routingKey << "'" << std::ends;
+ removeFilter(filter.str());
+}
+
+void
+BindingRecordset::removeForExchange(uint64_t exchangeId)
+{
+ // Look up the affected bindings by the exchange ID
+ std::ostringstream filter;
+ filter << "exchangeId = " << exchangeId << std::ends;
+ removeFilter(filter.str());
+}
+
+void
+BindingRecordset::removeForQueue(uint64_t queueId)
+{
+ // Look up the affected bindings by the queue ID
+ std::ostringstream filter;
+ filter << "queueId = " << queueId << std::ends;
+ removeFilter(filter.str());
+}
+
+void
+BindingRecordset::recover(broker::RecoveryManager& recoverer,
+ const store::ExchangeMap& exchMap,
+ const store::QueueMap& queueMap)
+{
+ if (rs->BOF && rs->EndOfFile)
+ return; // Nothing to do
+ rs->MoveFirst();
+ Binding b;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&b);
+ while (!rs->EndOfFile) {
+ long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ store::ExchangeMap::const_iterator exch = exchMap.find(b.exchangeId);
+ if (exch == exchMap.end()) {
+ std::ostringstream msg;
+ msg << "Error recovering bindings; exchange ID " << b.exchangeId
+ << " not found in exchange map";
+ throw qpid::Exception(msg.str());
+ }
+ broker::RecoverableExchange::shared_ptr exchPtr = exch->second;
+ store::QueueMap::const_iterator q = queueMap.find(b.queueId);
+ if (q == queueMap.end()) {
+ std::ostringstream msg;
+ msg << "Error recovering bindings; queue ID " << b.queueId
+ << " not found in queue map";
+ throw qpid::Exception(msg.str());
+ }
+ broker::RecoverableQueue::shared_ptr qPtr = q->second;
+ // The recovery manager wants the queue name, so get it from the
+ // RecoverableQueue.
+ std::string key(b.routingKey);
+ exchPtr->bind(qPtr->getName(), key, blob);
+ rs->MoveNext();
+ }
+
+ piAdoRecordBinding->Release();
+}
+
+void
+BindingRecordset::dump()
+{
+ Recordset::dump();
+ if (rs->EndOfFile && rs->BOF) // No records
+ return;
+ rs->MoveFirst();
+
+ Binding b;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&b);
+
+ while (VARIANT_FALSE == rs->EndOfFile) {
+ QPID_LOG(notice, "exch Id " << b.exchangeId
+ << ", q Id " << b.queueId
+ << ", k: " << b.routingKey);
+ rs->MoveNext();
+ }
+
+ piAdoRecordBinding->Release();
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h
new file mode 100644
index 0000000000..3cb732de75
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h
@@ -0,0 +1,88 @@
+#ifndef QPID_STORE_MSSQL_BINDINGRECORDSET_H
+#define QPID_STORE_MSSQL_BINDINGRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <icrsint.h>
+#include "Recordset.h"
+#include <qpid/store/StorageProvider.h>
+#include <qpid/broker/RecoveryManager.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class BindingRecordset
+ *
+ * Class for the binding records.
+ */
+class BindingRecordset : public Recordset {
+
+ class Binding : public CADORecordBinding {
+ BEGIN_ADO_BINDING(Binding)
+ ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, exchangeId, FALSE)
+ ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE)
+ ADO_VARIABLE_LENGTH_ENTRY4(3, adVarChar, routingKey,
+ sizeof(routingKey), FALSE)
+ END_ADO_BINDING()
+
+ public:
+ uint64_t exchangeId;
+ uint64_t queueId;
+ char routingKey[256];
+ };
+
+ // Remove all records matching the specified filter/query.
+ void removeFilter(const std::string& filter);
+
+public:
+ // Add a new binding
+ void add(uint64_t exchangeId,
+ uint64_t queueId,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable& args);
+
+ // Remove a specific binding
+ void remove(uint64_t exchangeId,
+ uint64_t queueId,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable& args);
+
+ // Remove all bindings for the specified exchange
+ void removeForExchange(uint64_t exchangeId);
+
+ // Remove all bindings for the specified queue
+ void removeForQueue(uint64_t queueId);
+
+ // Recover bindings set using exchMap to get from Id to RecoverableExchange.
+ void recover(qpid::broker::RecoveryManager& recoverer,
+ const qpid::store::ExchangeMap& exchMap,
+ const qpid::store::QueueMap& queueMap);
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_BINDINGRECORDSET_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp
new file mode 100644
index 0000000000..1889f34e41
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BlobAdapter.h"
+#include <qpid/Exception.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+BlobAdapter::extractBuff()
+{
+ // To give a valid Buffer back, lock the safearray, obtaining a pointer to
+ // the actual data. Record the pointer in the Buffer so the destructor
+ // knows to unlock the safearray.
+ if (buff.getPointer() == 0) {
+ char *blob;
+ SafeArrayAccessData(this->parray, (void **)&blob);
+ qpid::framing::Buffer lockedBuff(blob, buff.getSize());
+ buff = lockedBuff;
+ }
+}
+
+
+BlobAdapter::~BlobAdapter()
+{
+ // If buff's pointer is set, the safearray is locked, so unlock it
+ if (buff.getPointer() != 0)
+ SafeArrayUnaccessData(this->parray);
+}
+
+BlobAdapter::operator qpid::framing::Buffer& ()
+{
+ extractBuff();
+ return buff;
+}
+
+BlobAdapter::operator qpid::framing::FieldTable& ()
+{
+ extractBuff();
+ fields.decode(buff);
+ return fields;
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h
new file mode 100644
index 0000000000..1c666392bc
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h
@@ -0,0 +1,62 @@
+#ifndef QPID_STORE_MSSQL_BLOBADAPTER_H
+#define QPID_STORE_MSSQL_BLOBADAPTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <comutil.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/FieldTable.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class BlobAdapter
+ *
+ * Adapter for accessing a blob (varbinary SQL field) as a qpid::framing::Buffer
+ * in an exception-safe way.
+ */
+class BlobAdapter : public _variant_t {
+private:
+ // This Buffer's pointer indicates whether or not a safearray has
+ // been locked; if it's 0, no locking was done.
+ qpid::framing::Buffer buff;
+ qpid::framing::FieldTable fields;
+
+ void extractBuff();
+
+public:
+ // Initialize with the known length of the data that will come.
+ // Assigning a _variant_t to this object will set up the array to be
+ // accessed with the operator Buffer&()
+ BlobAdapter(long blobSize) : _variant_t(), buff(0, blobSize) {}
+ ~BlobAdapter();
+ BlobAdapter& operator=(_variant_t& var_t_Src)
+ { _variant_t::operator=(var_t_Src); return *this; }
+ operator qpid::framing::Buffer& ();
+ operator qpid::framing::FieldTable& ();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_BLOBADAPTER_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp
new file mode 100644
index 0000000000..75d3dc2d86
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BlobEncoder.h"
+#include <qpid/Exception.h>
+#include <qpid/broker/Persistable.h>
+#include <qpid/broker/PersistableMessage.h>
+#include <boost/intrusive_ptr.hpp>
+#include <memory.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+template <class ITEM> void
+BlobEncoder::encode(const ITEM &item)
+{
+ SAFEARRAYBOUND bound[1] = {0, 0};
+ bound[0].cElements = item.encodedSize();
+ blob = SafeArrayCreate(VT_UI1, 1, bound);
+ if (S_OK != SafeArrayLock(blob)) {
+ SafeArrayDestroy(blob);
+ blob = 0;
+ throw qpid::Exception("Error locking blob area for persistable item");
+ }
+ try {
+ qpid::framing::Buffer buff((char *)blob->pvData, bound[0].cElements);
+ item.encode(buff);
+ }
+ catch(...) {
+ SafeArrayUnlock(blob);
+ SafeArrayDestroy(blob);
+ blob = 0;
+ throw;
+ }
+ this->vt = VT_ARRAY | VT_UI1;
+ this->parray = blob;
+ SafeArrayUnlock(blob);
+}
+
+template <> void
+BlobEncoder::encode(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &item)
+{
+ // NOTE! If this code changes, verify the recovery code in MessageRecordset
+ SAFEARRAYBOUND bound[1] = {0, 0};
+ bound[0].cElements = item->encodedSize() + sizeof(uint32_t);
+ blob = SafeArrayCreate(VT_UI1, 1, bound);
+ if (S_OK != SafeArrayLock(blob)) {
+ SafeArrayDestroy(blob);
+ blob = 0;
+ throw qpid::Exception("Error locking blob area for message");
+ }
+ try {
+ uint32_t headerSize = item->encodedHeaderSize();
+ qpid::framing::Buffer buff((char *)blob->pvData, bound[0].cElements);
+ buff.putLong(headerSize);
+ item->encode(buff);
+ }
+ catch(...) {
+ SafeArrayUnlock(blob);
+ SafeArrayDestroy(blob);
+ blob = 0;
+ throw;
+ }
+ this->vt = VT_ARRAY | VT_UI1;
+ this->parray = blob;
+ SafeArrayUnlock(blob);
+}
+
+template <> void
+BlobEncoder::encode(const std::string &item)
+{
+ SAFEARRAYBOUND bound[1] = {0, 0};
+ bound[0].cElements = item.size();
+ blob = SafeArrayCreate(VT_UI1, 1, bound);
+ if (S_OK != SafeArrayLock(blob)) {
+ SafeArrayDestroy(blob);
+ blob = 0;
+ throw qpid::Exception("Error locking blob area for string");
+ }
+ memcpy_s(blob->pvData, item.size(), item.data(), item.size());
+ this->vt = VT_ARRAY | VT_UI1;
+ this->parray = blob;
+ SafeArrayUnlock(blob);
+}
+
+BlobEncoder::BlobEncoder(const qpid::broker::Persistable &item) : blob(0)
+{
+ encode(item);
+}
+
+BlobEncoder::BlobEncoder(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &msg) : blob(0)
+{
+ encode(msg);
+}
+
+BlobEncoder::BlobEncoder(const qpid::framing::FieldTable &fields) : blob(0)
+{
+ encode(fields);
+}
+
+BlobEncoder::BlobEncoder(const std::string &data) : blob(0)
+{
+ encode(data);
+}
+
+BlobEncoder::~BlobEncoder()
+{
+ if (blob)
+ SafeArrayDestroy(blob);
+ blob = 0;
+ this->parray = 0;
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h
new file mode 100644
index 0000000000..d2b56223c1
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h
@@ -0,0 +1,61 @@
+#ifndef QPID_STORE_MSSQL_BLOBENCODER_H
+#define QPID_STORE_MSSQL_BLOBENCODER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <comutil.h>
+#include <string>
+#include <boost/intrusive_ptr.hpp>
+#include <qpid/broker/Persistable.h>
+#include <qpid/broker/PersistableMessage.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/FieldTable.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class BlobEncoder
+ *
+ * Encodes a blob (varbinary) field from a qpid::broker::Persistable or a
+ * qpid::framing::FieldTable (both of which can be encoded to
+ * qpid::framing::Buffer) so it can be passed to ADO methods for writing
+ * to the database.
+ */
+class BlobEncoder : public _variant_t {
+private:
+ SAFEARRAY *blob;
+
+ template <class ITEM> void encode(const ITEM &item);
+
+public:
+ BlobEncoder(const qpid::broker::Persistable &item);
+ BlobEncoder(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &msg);
+ BlobEncoder(const qpid::framing::FieldTable &fields);
+ BlobEncoder(const std::string& data);
+ ~BlobEncoder();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_BLOBENCODER_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp
new file mode 100644
index 0000000000..ef1757dbad
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "BlobRecordset.h"
+#include "BlobEncoder.h"
+#include "VariantHelper.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+BlobRecordset::add(const qpid::broker::Persistable& item)
+{
+ BlobEncoder blob (item); // Marshall item info to a blob
+ rs->AddNew();
+ rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+ rs->Update();
+ uint64_t id = rs->Fields->Item["persistenceId"]->Value;
+ item.setPersistenceId(id);
+}
+
+void
+BlobRecordset::remove(uint64_t id)
+{
+ // Look up the item by its persistenceId
+ std::ostringstream filter;
+ filter << "persistenceId = " << id << std::ends;
+ rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ if (!rs->EndOfFile) {
+ // Delete the record
+ rs->Delete(adAffectCurrent);
+ rs->Update();
+ }
+}
+
+void
+BlobRecordset::remove(const qpid::broker::Persistable& item)
+{
+ remove(item.getPersistenceId());
+}
+
+void
+BlobRecordset::dump()
+{
+ Recordset::dump();
+#if 1
+ if (rs->EndOfFile && rs->BOF) // No records
+ return;
+
+ rs->MoveFirst();
+ while (!rs->EndOfFile) {
+ uint64_t id = rs->Fields->Item["persistenceId"]->Value;
+ QPID_LOG(notice, " -> " << id);
+ rs->MoveNext();
+ }
+#else
+ for (Iterator iter = begin(); iter != end(); ++iter) {
+ uint64_t id = *iter.first;
+ QPID_LOG(notice, " -> " << id);
+ }
+#endif
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h
new file mode 100644
index 0000000000..4d1c338746
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h
@@ -0,0 +1,54 @@
+#ifndef QPID_STORE_MSSQL_BLOBRECORDSET_H
+#define QPID_STORE_MSSQL_BLOBRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Recordset.h"
+#include <qpid/broker/Persistable.h>
+#include <string>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class BlobRecordset
+ *
+ * Class for the "blob" records that record an id, varbinary(max) pair.
+ */
+class BlobRecordset : public Recordset {
+protected:
+
+public:
+ void add(const qpid::broker::Persistable& item);
+
+ // Remove a record given its Id.
+ void remove(uint64_t id);
+ void remove(const qpid::broker::Persistable& item);
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_BLOBRECORDSET_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp
new file mode 100644
index 0000000000..3219ea526a
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include <comdef.h>
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+DatabaseConnection::DatabaseConnection() : conn(0)
+{
+}
+
+DatabaseConnection::~DatabaseConnection()
+{
+ close();
+}
+
+void
+DatabaseConnection::open(const std::string& connectString,
+ const std::string& dbName)
+{
+ if (conn && conn->State == adStateOpen)
+ return;
+ std::string adoConnect = "Provider=SQLOLEDB;" + connectString;
+ try {
+ TESTHR(conn.CreateInstance(__uuidof(Connection)));
+ conn->ConnectionString = adoConnect.c_str();
+ conn->Open("", "", "", adConnectUnspecified);
+ if (dbName.length() > 0)
+ conn->DefaultDatabase = dbName.c_str();
+ }
+ catch(_com_error &e) {
+ close();
+ throw ADOException("MSSQL can't open " + dbName + " at " + adoConnect, e);
+ }
+}
+
+void
+DatabaseConnection::close()
+{
+ if (conn && conn->State == adStateOpen)
+ conn->Close();
+ conn = 0;
+}
+
+std::string
+DatabaseConnection::getErrors()
+{
+ long errCount = conn->Errors->Count;
+ if (errCount <= 0)
+ return "";
+ // Collection ranges from 0 to nCount -1.
+ std::ostringstream messages;
+ ErrorPtr pErr = NULL;
+ for (long i = 0 ; i < errCount ; i++ ) {
+ if (i > 0)
+ messages << "\n";
+ messages << "[" << i << "] ";
+ pErr = conn->Errors->GetItem(i);
+ messages << "Error " << pErr->Number << ": "
+ << (LPCSTR)pErr->Description;
+ }
+ messages << std::ends;
+ return messages.str();
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h
new file mode 100644
index 0000000000..785d1587c5
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h
@@ -0,0 +1,64 @@
+#ifndef QPID_STORE_MSSQL_DATABASECONNECTION_H
+#define QPID_STORE_MSSQL_DATABASECONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
+#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
+ no_namespace rename("EOF", "EndOfFile")
+
+#include <string>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class DatabaseConnection
+ *
+ * Represents a connection to the SQL database. This class wraps the
+ * needed _ConnectionPtr for ADO as well as the needed COM initialization
+ * and cleanup that each thread requires. It is expected that this class
+ * will be maintained in thread-specific storage so it has no locks.
+ */
+class DatabaseConnection {
+protected:
+ _ConnectionPtr conn;
+
+public:
+ DatabaseConnection();
+ ~DatabaseConnection();
+ void open(const std::string& connectString,
+ const std::string& dbName = "");
+ void close();
+ operator _ConnectionPtr () { return conn; }
+
+ void beginTransaction() { conn->BeginTrans(); }
+ void commitTransaction() {conn->CommitTrans(); }
+ void rollbackTransaction() { conn->RollbackTrans(); }
+
+ std::string getErrors();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_DATABASECONNECTION_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/Exception.h b/qpid/cpp/src/qpid/store/ms-sql/Exception.h
new file mode 100644
index 0000000000..65ec3388ff
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/Exception.h
@@ -0,0 +1,66 @@
+#ifndef QPID_STORE_MSSQL_EXCEPTION_H
+#define QPID_STORE_MSSQL_EXCEPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <comdef.h>
+#include <qpid/store/StorageProvider.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class Exception : public qpid::store::StorageProvider::Exception
+{
+protected:
+ std::string text;
+public:
+ Exception(const std::string& _text) : text(_text) {}
+ virtual ~Exception() {}
+ virtual const char* what() const throw() { return text.c_str(); }
+};
+
+class ADOException : public Exception
+{
+public:
+ ADOException(const std::string& _text,
+ _com_error &e,
+ const std::string& providerErrors = "")
+ : Exception(_text) {
+ text += ": ";
+ text += e.ErrorMessage();
+ IErrorInfo *i = e.ErrorInfo();
+ if (i != 0) {
+ text += ": ";
+ _bstr_t wmsg = e.Description();
+ text += (const char *)wmsg;
+ i->Release();
+ }
+ if (providerErrors.length() > 0)
+ text += providerErrors;
+ }
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_EXCEPTION_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
new file mode 100644
index 0000000000..1432cc8fca
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
@@ -0,0 +1,1286 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdlib.h>
+#include <string>
+#include <windows.h>
+#include <qpid/broker/RecoverableQueue.h>
+#include <qpid/log/Statement.h>
+#include <qpid/store/MessageStorePlugin.h>
+#include <qpid/store/StorageProvider.h>
+#include "AmqpTransaction.h"
+#include "BlobAdapter.h"
+#include "BlobRecordset.h"
+#include "BindingRecordset.h"
+#include "MessageMapRecordset.h"
+#include "MessageRecordset.h"
+#include "TplRecordset.h"
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include "State.h"
+#include "VariantHelper.h"
+
+// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
+#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
+ no_namespace rename("EOF", "EndOfFile")
+#include <comdef.h>
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+
+// Table names
+const std::string TblBinding("tblBinding");
+const std::string TblConfig("tblConfig");
+const std::string TblExchange("tblExchange");
+const std::string TblMessage("tblMessage");
+const std::string TblMessageMap("tblMessageMap");
+const std::string TblQueue("tblQueue");
+const std::string TblTpl("tblTPL");
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class MSSqlProvider
+ *
+ * Implements a qpid::store::StorageProvider that uses Microsoft SQL Server as
+ * the backend data store for Qpid.
+ */
+class MSSqlProvider : public qpid::store::StorageProvider
+{
+protected:
+ void finalizeMe();
+
+ void dump();
+
+public:
+ MSSqlProvider();
+ ~MSSqlProvider();
+
+ virtual qpid::Options* getOptions() { return &options; }
+
+ virtual void earlyInitialize (Plugin::Target& target);
+ virtual void initialize(Plugin::Target& target);
+
+ /**
+ * Receive notification that this provider is the one that will actively
+ * handle provider storage for the target. If the provider is to be used,
+ * this method will be called after earlyInitialize() and before any
+ * recovery operations (recovery, in turn, precedes call to initialize()).
+ */
+ virtual void activate(MessageStorePlugin &store);
+
+ /**
+ * @name Methods inherited from qpid::broker::MessageStore
+ */
+
+ /**
+ * Record the existence of a durable queue
+ */
+ virtual void create(PersistableQueue& queue,
+ const qpid::framing::FieldTable& args);
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(PersistableQueue& queue);
+
+ /**
+ * Record the existence of a durable exchange
+ */
+ virtual void create(const PersistableExchange& exchange,
+ const qpid::framing::FieldTable& args);
+ /**
+ * Destroy a durable exchange
+ */
+ virtual void destroy(const PersistableExchange& exchange);
+
+ /**
+ * Record a binding
+ */
+ virtual void bind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+
+ /**
+ * Forget a binding
+ */
+ virtual void unbind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+
+ /**
+ * Record generic durable configuration
+ */
+ virtual void create(const PersistableConfig& config);
+
+ /**
+ * Destroy generic durable configuration
+ */
+ virtual void destroy(const PersistableConfig& config);
+
+ /**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+ virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg);
+
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(PersistableMessage& msg);
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data);
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+ virtual void loadContent(const qpid::broker::PersistableQueue& queue,
+ const boost::intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
+
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of the queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
+
+ /**
+ * Flushes all async messages to disk for the specified queue
+ *
+ * Note: this is a no-op for this provider.
+ *
+ * @param queue the name of the queue from which it is to be dequeued
+ */
+ virtual void flush(const PersistableQueue& queue) {};
+
+ /**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk
+ *
+ * @param queue the name of the queue to check for outstanding AIO
+ */
+ virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue)
+ {return 0;}
+ //@}
+
+ /**
+ * @name Methods inherited from qpid::broker::TransactionalStore
+ */
+ //@{
+ virtual std::auto_ptr<qpid::broker::TransactionContext> begin();
+ virtual std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+ virtual void prepare(qpid::broker::TPCTransactionContext& txn);
+ virtual void commit(qpid::broker::TransactionContext& txn);
+ virtual void abort(qpid::broker::TransactionContext& txn);
+ virtual void collectPreparedXids(std::set<std::string>& xids);
+ //@}
+
+ virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer);
+ virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+ ExchangeMap& exchangeMap);
+ virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
+ QueueMap& queueMap);
+ virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap);
+ virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
+ MessageMap& messageMap,
+ MessageQueueMap& messageQueueMap);
+ virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap);
+
+private:
+ struct ProviderOptions : public qpid::Options
+ {
+ std::string connectString;
+ std::string catalogName;
+
+ ProviderOptions(const std::string &name)
+ : qpid::Options(name),
+ catalogName("QpidStore")
+ {
+ const enum { NAMELEN = MAX_COMPUTERNAME_LENGTH + 1 };
+ TCHAR myName[NAMELEN];
+ DWORD myNameLen = NAMELEN;
+ GetComputerName(myName, &myNameLen);
+ connectString = "Data Source=";
+ connectString += myName;
+ connectString += "\\SQLEXPRESS;Integrated Security=SSPI";
+ addOptions()
+ ("connect",
+ qpid::optValue(connectString, "STRING"),
+ "Connection string for the database to use. Will prepend "
+ "Provider=SQLOLEDB;")
+ ("catalog",
+ qpid::optValue(catalogName, "DB NAME"),
+ "Catalog (database) name")
+ ;
+ }
+ };
+ ProviderOptions options;
+
+ // Each thread has a separate connection to the database and also needs
+ // to manage its COM initialize/finalize individually. This is done by
+ // keeping a thread-specific State.
+ boost::thread_specific_ptr<State> dbState;
+
+ State *initState();
+ DatabaseConnection *initConnection(void);
+ void createDb(DatabaseConnection *db, const std::string &name);
+};
+
+static MSSqlProvider static_instance_registers_plugin;
+
+void
+MSSqlProvider::finalizeMe()
+{
+ dbState.reset();
+}
+
+MSSqlProvider::MSSqlProvider()
+ : options("MS SQL Provider options")
+{
+}
+
+MSSqlProvider::~MSSqlProvider()
+{
+}
+
+void
+MSSqlProvider::earlyInitialize(Plugin::Target &target)
+{
+ MessageStorePlugin *store = dynamic_cast<MessageStorePlugin *>(&target);
+ if (store) {
+ // If the database init fails, report it and don't register; give
+ // the rest of the broker a chance to run.
+ //
+ // Don't try to initConnection() since that will fail if the
+ // database doesn't exist. Instead, try to open a connection without
+ // a database name, then search for the database. There's still a
+ // chance this provider won't be selected for the store too, so be
+ // be sure to close the database connection before return to avoid
+ // leaving a connection up that will not be used.
+ try {
+ initState(); // This initializes COM
+ std::auto_ptr<DatabaseConnection> db(new DatabaseConnection());
+ db->open(options.connectString, "");
+ _ConnectionPtr conn(*db);
+ _RecordsetPtr pCatalogs = NULL;
+ VariantHelper<std::string> catalogName(options.catalogName);
+ pCatalogs = conn->OpenSchema(adSchemaCatalogs, catalogName);
+ if (pCatalogs->EndOfFile) {
+ // Database doesn't exist; create it
+ QPID_LOG(notice,
+ "MSSQL: Creating database " + options.catalogName);
+ createDb(db.get(), options.catalogName);
+ }
+ else {
+ QPID_LOG(notice,
+ "MSSQL: Database located: " + options.catalogName);
+ }
+ if (pCatalogs) {
+ if (pCatalogs->State == adStateOpen)
+ pCatalogs->Close();
+ pCatalogs = 0;
+ }
+ db->close();
+ store->providerAvailable("MSSQL", this);
+ }
+ catch (qpid::Exception &e) {
+ QPID_LOG(error, e.what());
+ return;
+ }
+ store->addFinalizer(boost::bind(&MSSqlProvider::finalizeMe, this));
+ }
+}
+
+void
+MSSqlProvider::initialize(Plugin::Target& target)
+{
+}
+
+void
+MSSqlProvider::activate(MessageStorePlugin &store)
+{
+ QPID_LOG(info, "MS SQL Provider is up");
+}
+
+void
+MSSqlProvider::create(PersistableQueue& queue,
+ const qpid::framing::FieldTable& /*args needed for jrnl*/)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ try {
+ db->beginTransaction();
+ rsQueues.open(db, TblQueue);
+ rsQueues.add(queue);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error creating queue " + queue.getName(), e, errs);
+ }
+}
+
+/**
+ * Destroy a durable queue
+ */
+void
+MSSqlProvider::destroy(PersistableQueue& queue)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ BindingRecordset rsBindings;
+ MessageRecordset rsMessages;
+ MessageMapRecordset rsMessageMaps;
+ try {
+ db->beginTransaction();
+ rsQueues.open(db, TblQueue);
+ rsBindings.open(db, TblBinding);
+ rsMessages.open(db, TblMessage);
+ rsMessageMaps.open(db, TblMessageMap);
+ // Remove bindings first; the queue IDs can't be ripped out from
+ // under the references in the bindings table. Then remove the
+ // message->queue entries for the queue, also because the queue can't
+ // be deleted while there are references to it. If there are messages
+ // orphaned by removing the queue references, they're deleted by
+ // a trigger on the tblMessageMap table. Lastly, the queue record
+ // can be removed.
+ rsBindings.removeForQueue(queue.getPersistenceId());
+ rsMessageMaps.removeForQueue(queue.getPersistenceId());
+ rsQueues.remove(queue);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting queue " + queue.getName(), e, errs);
+ }
+}
+
+/**
+ * Record the existence of a durable exchange
+ */
+void
+MSSqlProvider::create(const PersistableExchange& exchange,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ try {
+ db->beginTransaction();
+ rsExchanges.open(db, TblExchange);
+ rsExchanges.add(exchange);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error creating exchange " + exchange.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Destroy a durable exchange
+ */
+void
+MSSqlProvider::destroy(const PersistableExchange& exchange)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsExchanges.open(db, TblExchange);
+ rsBindings.open(db, TblBinding);
+ // Remove bindings first; the exchange IDs can't be ripped out from
+ // under the references in the bindings table.
+ rsBindings.removeForExchange(exchange.getPersistenceId());
+ rsExchanges.remove(exchange);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting exchange " + exchange.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Record a binding
+ */
+void
+MSSqlProvider::bind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsBindings.open(db, TblBinding);
+ rsBindings.add(exchange.getPersistenceId(),
+ queue.getPersistenceId(),
+ key,
+ args);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error binding exchange " + exchange.getName() +
+ " to queue " + queue.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Forget a binding
+ */
+void
+MSSqlProvider::unbind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsBindings.open(db, TblBinding);
+ rsBindings.remove(exchange.getPersistenceId(),
+ queue.getPersistenceId(),
+ key,
+ args);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error unbinding exchange " + exchange.getName() +
+ " from queue " + queue.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Record generic durable configuration
+ */
+void
+MSSqlProvider::create(const PersistableConfig& config)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
+ try {
+ db->beginTransaction();
+ rsConfigs.open(db, TblConfig);
+ rsConfigs.add(config);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error creating config " + config.getName(), e, errs);
+ }
+}
+
+/**
+ * Destroy generic durable configuration
+ */
+void
+MSSqlProvider::destroy(const PersistableConfig& config)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
+ try {
+ db->beginTransaction();
+ rsConfigs.open(db, TblConfig);
+ rsConfigs.remove(config);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting config " + config.getName(), e, errs);
+ }
+}
+
+/**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+void
+MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
+{
+ DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
+ try {
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.add(msg);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error staging message", e, errs);
+ }
+}
+
+/**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+void
+MSSqlProvider::destroy(PersistableMessage& msg)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsMessages;
+ try {
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.remove(msg);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting message", e, errs);
+ }
+}
+
+/**
+ * Appends content to a previously staged message
+ */
+void
+MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data)
+{
+ DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
+ try {
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.append(msg, data);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error appending to message", e, errs);
+ }
+}
+
+/**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+void
+MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
+ const boost::intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+ // SQL store keeps all messages in one table, so we don't need the
+ // queue reference.
+ DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
+ try {
+ rsMessages.open(db, TblMessage);
+ rsMessages.loadContent(msg, data, offset, length);
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ throw ADOException("Error loading message content", e, errs);
+ }
+}
+
+/**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * @param ctxt The transaction context under which this enqueue happens.
+ * @param msg The message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ */
+void
+MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
+{
+ // If this enqueue is in the context of a transaction, use the specified
+ // transaction to nest a new transaction for this operation. However, if
+ // this is not in the context of a transaction, then just use the thread's
+ // DatabaseConnection with a ADO transaction.
+ DatabaseConnection *db = 0;
+ std::string xid;
+ AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
+ if (atxn == 0) {
+ db = initConnection();
+ db->beginTransaction();
+ }
+ else {
+ (void)initState(); // Ensure this thread is initialized
+ // It's a transactional enqueue; if it's TPC, grab the xid.
+ AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt);
+ if (tpcTxn)
+ xid = tpcTxn->getXid();
+ db = atxn->dbConn();
+ try {
+ atxn->sqlBegin();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error queuing message", e, db->getErrors());
+ }
+ }
+
+ MessageRecordset rsMessages;
+ MessageMapRecordset rsMap;
+ try {
+ if (msg->getPersistenceId() == 0) { // Message itself not yet saved
+ rsMessages.open(db, TblMessage);
+ rsMessages.add(msg);
+ }
+ rsMap.open(db, TblMessageMap);
+ rsMap.add(msg->getPersistenceId(), queue.getPersistenceId(), xid);
+ if (atxn)
+ atxn->sqlCommit();
+ else
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ if (atxn)
+ atxn->sqlAbort();
+ else
+ db->rollbackTransaction();
+ throw ADOException("Error queuing message", e, errs);
+ }
+ msg->enqueueComplete();
+}
+
+/**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * @param ctxt The transaction context under which this dequeue happens.
+ * @param msg The message to dequeue
+ * @param queue The queue from which it is to be dequeued
+ */
+void
+MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
+{
+ // If this dequeue is in the context of a transaction, use the specified
+ // transaction to nest a new transaction for this operation. However, if
+ // this is not in the context of a transaction, then just use the thread's
+ // DatabaseConnection with a ADO transaction.
+ DatabaseConnection *db = 0;
+ std::string xid;
+ AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
+ if (atxn == 0) {
+ db = initConnection();
+ db->beginTransaction();
+ }
+ else {
+ (void)initState(); // Ensure this thread is initialized
+ // It's a transactional dequeue; if it's TPC, grab the xid.
+ AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt);
+ if (tpcTxn)
+ xid = tpcTxn->getXid();
+ db = atxn->dbConn();
+ try {
+ atxn->sqlBegin();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error queuing message", e, db->getErrors());
+ }
+ }
+
+ MessageMapRecordset rsMap;
+ try {
+ rsMap.open(db, TblMessageMap);
+ // TPC dequeues are just marked pending and will actually be removed
+ // when the transaction commits; Single-phase dequeues are removed
+ // now, relying on the SQL transaction to put it back if the
+ // transaction doesn't commit.
+ if (!xid.empty()) {
+ rsMap.pendingRemove(msg->getPersistenceId(),
+ queue.getPersistenceId(),
+ xid);
+ }
+ else {
+ rsMap.remove(msg->getPersistenceId(),
+ queue.getPersistenceId());
+ }
+ if (atxn)
+ atxn->sqlCommit();
+ else
+ db->commitTransaction();
+ }
+ catch(ms_sql::Exception&) {
+ if (atxn)
+ atxn->sqlAbort();
+ else
+ db->rollbackTransaction();
+ throw;
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ if (atxn)
+ atxn->sqlAbort();
+ else
+ db->rollbackTransaction();
+ throw ADOException("Error dequeuing message", e, errs);
+ }
+ msg->dequeueComplete();
+}
+
+std::auto_ptr<qpid::broker::TransactionContext>
+MSSqlProvider::begin()
+{
+ (void)initState(); // Ensure this thread is initialized
+
+ // Transactions are associated with the Connection, so this transaction
+ // context needs its own connection. At the time of writing, single-phase
+ // transactions are dealt with completely on one thread, so we really
+ // could just use the thread-specific DatabaseConnection for this.
+ // However, that would introduce an ugly, hidden coupling, so play
+ // it safe and handle this just like a TPC transaction, which actually
+ // can be prepared and committed/aborted from different threads,
+ // making it a bad idea to try using the thread-local DatabaseConnection.
+ boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection);
+ db->open(options.connectString, options.catalogName);
+ std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db));
+ tx->sqlBegin();
+ std::auto_ptr<qpid::broker::TransactionContext> tc(tx);
+ return tc;
+}
+
+std::auto_ptr<qpid::broker::TPCTransactionContext>
+MSSqlProvider::begin(const std::string& xid)
+{
+ (void)initState(); // Ensure this thread is initialized
+ boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection);
+ db->open(options.connectString, options.catalogName);
+ std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid));
+ tx->sqlBegin();
+
+ TplRecordset rsTpl;
+ try {
+ tx->sqlBegin();
+ rsTpl.open(db.get(), TblTpl);
+ rsTpl.add(xid);
+ tx->sqlCommit();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ tx->sqlAbort();
+ throw ADOException("Error adding TPL record", e, errs);
+ }
+
+ std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
+ return tc;
+}
+
+void
+MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn)
+{
+ // Commit all the marked-up enqueue/dequeue ops and the TPL record.
+ // On commit/rollback the TPL will be removed and the TPL markups
+ // on the message map will be cleaned up as well.
+ (void)initState(); // Ensure this thread is initialized
+ AmqpTPCTransaction *atxn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+ if (atxn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ try {
+ atxn->sqlCommit();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error preparing", e, atxn->dbConn()->getErrors());
+ }
+ atxn->setPrepared();
+}
+
+void
+MSSqlProvider::commit(qpid::broker::TransactionContext& txn)
+{
+ (void)initState(); // Ensure this thread is initialized
+ /*
+ * One-phase transactions simply commit the outer SQL transaction
+ * that was begun on begin(). Two-phase transactions are different -
+ * the SQL transaction started on begin() was committed on prepare()
+ * so all the SQL records reflecting the enqueue/dequeue actions for
+ * the transaction are recorded but with xid markups on them to reflect
+ * that they are prepared but not committed. Now go back and remove
+ * the markups, deleting those marked for removal.
+ */
+ AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+ if (p2txn == 0) {
+ AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn);
+ if (p1txn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ p1txn->sqlCommit();
+ return;
+ }
+
+ DatabaseConnection *db(p2txn->dbConn());
+ TplRecordset rsTpl;
+ MessageMapRecordset rsMessageMap;
+ try {
+ db->beginTransaction();
+ rsTpl.open(db, TblTpl);
+ rsMessageMap.open(db, TblMessageMap);
+ rsMessageMap.commitPrepared(p2txn->getXid());
+ rsTpl.remove(p2txn->getXid());
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error committing transaction", e, errs);
+ }
+}
+
+void
+MSSqlProvider::abort(qpid::broker::TransactionContext& txn)
+{
+ (void)initState(); // Ensure this thread is initialized
+ /*
+ * One-phase and non-prepared two-phase transactions simply abort
+ * the outer SQL transaction that was begun on begin(). However, prepared
+ * two-phase transactions are different - the SQL transaction started
+ * on begin() was committed on prepare() so all the SQL records
+ * reflecting the enqueue/dequeue actions for the transaction are
+ * recorded but with xid markups on them to reflect that they are
+ * prepared but not committed. Now go back and remove the markups,
+ * deleting those marked for addition.
+ */
+ AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+ if (p2txn == 0 || !p2txn->isPrepared()) {
+ AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn);
+ if (p1txn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ p1txn->sqlAbort();
+ return;
+ }
+
+ DatabaseConnection *db(p2txn->dbConn());
+ TplRecordset rsTpl;
+ MessageMapRecordset rsMessageMap;
+ try {
+ db->beginTransaction();
+ rsTpl.open(db, TblTpl);
+ rsMessageMap.open(db, TblMessageMap);
+ rsMessageMap.abortPrepared(p2txn->getXid());
+ rsTpl.remove(p2txn->getXid());
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error committing transaction", e, errs);
+ }
+
+
+ (void)initState(); // Ensure this thread is initialized
+ AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn);
+ if (atxn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ atxn->sqlAbort();
+}
+
+void
+MSSqlProvider::collectPreparedXids(std::set<std::string>& xids)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ TplRecordset rsTpl;
+ rsTpl.open(db, TblTpl);
+ rsTpl.recover(xids);
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error reading TPL", e, db->getErrors());
+ }
+}
+
+// @TODO Much of this recovery code is way too similar... refactor to
+// a recover template method on BlobRecordset.
+
+void
+MSSqlProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer)
+{
+ DatabaseConnection *db = 0;
+ try {
+ db = initConnection();
+ BlobRecordset rsConfigs;
+ rsConfigs.open(db, TblConfig);
+ _RecordsetPtr p = (_RecordsetPtr)rsConfigs;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Config instance and reset its ID.
+ broker::RecoverableConfig::shared_ptr config =
+ recoverer.recoverConfig(blob);
+ config->setPersistenceId(id);
+ p->MoveNext();
+ }
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering configs",
+ e,
+ db ? db->getErrors() : "");
+ }
+}
+
+void
+MSSqlProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+ ExchangeMap& exchangeMap)
+{
+ DatabaseConnection *db = 0;
+ try {
+ db = initConnection();
+ BlobRecordset rsExchanges;
+ rsExchanges.open(db, TblExchange);
+ _RecordsetPtr p = (_RecordsetPtr)rsExchanges;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Exchange instance, reset its ID, and remember the
+ // ones restored for matching up when recovering bindings.
+ broker::RecoverableExchange::shared_ptr exchange =
+ recoverer.recoverExchange(blob);
+ exchange->setPersistenceId(id);
+ exchangeMap[id] = exchange;
+ p->MoveNext();
+ }
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering exchanges",
+ e,
+ db ? db->getErrors() : "");
+ }
+}
+
+void
+MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer,
+ QueueMap& queueMap)
+{
+ DatabaseConnection *db = 0;
+ try {
+ db = initConnection();
+ BlobRecordset rsQueues;
+ rsQueues.open(db, TblQueue);
+ _RecordsetPtr p = (_RecordsetPtr)rsQueues;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Queue instance and reset its ID.
+ broker::RecoverableQueue::shared_ptr queue =
+ recoverer.recoverQueue(blob);
+ queue->setPersistenceId(id);
+ queueMap[id] = queue;
+ p->MoveNext();
+ }
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering queues",
+ e,
+ db ? db->getErrors() : "");
+ }
+}
+
+void
+MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer,
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap)
+{
+ DatabaseConnection *db = 0;
+ try {
+ db = initConnection();
+ BindingRecordset rsBindings;
+ rsBindings.open(db, TblBinding);
+ rsBindings.recover(recoverer, exchangeMap, queueMap);
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering bindings",
+ e,
+ db ? db->getErrors() : "");
+ }
+}
+
+void
+MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer,
+ MessageMap& messageMap,
+ MessageQueueMap& messageQueueMap)
+{
+ DatabaseConnection *db = 0;
+ try {
+ db = initConnection();
+ MessageRecordset rsMessages;
+ rsMessages.open(db, TblMessage);
+ rsMessages.recover(recoverer, messageMap);
+
+ MessageMapRecordset rsMessageMaps;
+ rsMessageMaps.open(db, TblMessageMap);
+ rsMessageMaps.recover(messageQueueMap);
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering messages",
+ e,
+ db ? db->getErrors() : "");
+ }
+}
+
+void
+MSSqlProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap)
+{
+ DatabaseConnection *db = initConnection();
+ std::set<std::string> xids;
+ try {
+ TplRecordset rsTpl;
+ rsTpl.open(db, TblTpl);
+ rsTpl.recover(xids);
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering TPL records", e, db->getErrors());
+ }
+
+ try {
+ // Rebuild the needed RecoverableTransactions.
+ for (std::set<std::string>::const_iterator iXid = xids.begin();
+ iXid != xids.end();
+ ++iXid) {
+ boost::shared_ptr<DatabaseConnection> dbX(new DatabaseConnection);
+ dbX->open(options.connectString, options.catalogName);
+ std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(dbX,
+ *iXid));
+ tx->setPrepared();
+ std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
+ dtxMap[*iXid] = recoverer.recoverTransaction(*iXid, tc);
+ }
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recreating dtx connection", e);
+ }
+}
+
+////////////// Internal Methods
+
+State *
+MSSqlProvider::initState()
+{
+ State *state = dbState.get(); // See if thread has initialized
+ if (!state) {
+ state = new State;
+ dbState.reset(state);
+ }
+ return state;
+}
+
+DatabaseConnection *
+MSSqlProvider::initConnection(void)
+{
+ State *state = initState();
+ if (state->dbConn != 0)
+ return state->dbConn; // And the DatabaseConnection is set up too
+ std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+ db->open(options.connectString, options.catalogName);
+ state->dbConn = db.release();
+ return state->dbConn;
+}
+
+void
+MSSqlProvider::createDb(DatabaseConnection *db, const std::string &name)
+{
+ const std::string dbCmd = "CREATE DATABASE " + name;
+ const std::string useCmd = "USE " + name;
+ const std::string tableCmd = "CREATE TABLE ";
+ const std::string colSpecs =
+ " (persistenceId bigint PRIMARY KEY NOT NULL IDENTITY(1,1),"
+ " fieldTableBlob varbinary(MAX) NOT NULL)";
+ const std::string bindingSpecs =
+ " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL,"
+ " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
+ " routingKey varchar(255),"
+ " fieldTableBlob varbinary(MAX))";
+ const std::string messageMapSpecs =
+ " (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL,"
+ " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
+ " prepareStatus tinyint CHECK (prepareStatus IS NULL OR "
+ " prepareStatus IN (1, 2)),"
+ " xid varbinary(512) REFERENCES tblTPL(xid)"
+ " CONSTRAINT CK_NoDups UNIQUE NONCLUSTERED (messageId, queueId) )";
+ const std::string tplSpecs = " (xid varbinary(512) PRIMARY KEY NOT NULL)";
+ // SET NOCOUNT ON added to prevent extra result sets from
+ // interfering with SELECT statements. (Added by SQL Management)
+ const std::string removeUnrefMsgsTrigger =
+ "CREATE TRIGGER dbo.RemoveUnreferencedMessages "
+ "ON tblMessageMap AFTER DELETE AS BEGIN "
+ "SET NOCOUNT ON; "
+ "DELETE FROM tblMessage "
+ "WHERE tblMessage.persistenceId IN "
+ " (SELECT messageId FROM deleted) AND"
+ " NOT EXISTS(SELECT * FROM tblMessageMap"
+ " WHERE tblMessageMap.messageId IN"
+ " (SELECT messageId FROM deleted)) "
+ "END";
+
+ _variant_t unused;
+ _bstr_t dbStr = dbCmd.c_str();
+ _ConnectionPtr conn(*db);
+ try {
+ conn->Execute(dbStr, &unused, adExecuteNoRecords);
+ _bstr_t useStr = useCmd.c_str();
+ conn->Execute(useStr, &unused, adExecuteNoRecords);
+ std::string makeTable = tableCmd + TblQueue + colSpecs;
+ _bstr_t makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblExchange + colSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblConfig + colSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblMessage + colSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblBinding + bindingSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblTpl + tplSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblMessageMap + messageMapSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ _bstr_t addTriggerStr = removeUnrefMsgsTrigger.c_str();
+ conn->Execute(addTriggerStr, &unused, adExecuteNoRecords);
+ }
+ catch(_com_error &e) {
+ throw ADOException("MSSQL can't create " + name, e, db->getErrors());
+ }
+}
+
+void
+MSSqlProvider::dump()
+{
+ // dump all db records to qpid_log
+ QPID_LOG(notice, "DB Dump: (not dumping anything)");
+ // rsQueues.dump();
+}
+
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
new file mode 100644
index 0000000000..ce9fa61010
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
@@ -0,0 +1,267 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+#include <qpid/store/StorageProvider.h>
+
+#include "MessageMapRecordset.h"
+#include "BlobEncoder.h"
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include "VariantHelper.h"
+
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+MessageMapRecordset::open(DatabaseConnection* conn, const std::string& table)
+{
+ init(conn, table);
+}
+
+void
+MessageMapRecordset::add(uint64_t messageId,
+ uint64_t queueId,
+ const std::string& xid)
+{
+ std::ostringstream command;
+ command << "INSERT INTO " << tableName
+ << " (messageId, queueId";
+ if (!xid.empty())
+ command << ", prepareStatus, xid";
+ command << ") VALUES (" << messageId << "," << queueId;
+ if (!xid.empty())
+ command << "," << PREPARE_ADD << ",?";
+ command << ")" << std::ends;
+
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.str().c_str();
+ cmd->CommandType = adCmdText;
+ if (!xid.empty()) {
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ }
+ cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId)
+{
+ std::ostringstream command;
+ command << "DELETE FROM " << tableName
+ << " WHERE queueId = " << queueId
+ << " AND messageId = " << messageId << std::ends;
+ _CommandPtr cmd = NULL;
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.str().c_str();
+ cmd->CommandType = adCmdText;
+ _variant_t deletedRecords;
+ cmd->Execute(&deletedRecords, NULL, adCmdText | adExecuteNoRecords);
+ if ((long)deletedRecords == 0)
+ throw ms_sql::Exception("Message does not exist in queue mapping");
+ // Trigger on deleting the mapping takes care of deleting orphaned
+ // message record from tblMessage.
+}
+
+void
+MessageMapRecordset::pendingRemove(uint64_t messageId,
+ uint64_t queueId,
+ const std::string& xid)
+{
+ // Look up the mapping for the specified message and queue. There
+ // should be only one because of the uniqueness constraint in the
+ // SQL table. Update it to reflect it's pending delete with
+ // the specified xid.
+ std::ostringstream command;
+ command << "UPDATE " << tableName
+ << " SET prepareStatus=" << PREPARE_REMOVE
+ << " , xid=?"
+ << " WHERE queueId = " << queueId
+ << " AND messageId = " << messageId << std::ends;
+
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.str().c_str();
+ cmd->CommandType = adCmdText;
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+MessageMapRecordset::removeForQueue(uint64_t queueId)
+{
+ std::ostringstream command;
+ command << "DELETE FROM " << tableName
+ << " WHERE queueId = " << queueId << std::ends;
+ _CommandPtr cmd = NULL;
+
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.str().c_str();
+ cmd->CommandType = adCmdText;
+ cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+MessageMapRecordset::commitPrepared(const std::string& xid)
+{
+ // Find all the records for the specified xid. Records marked as adding
+ // are now permanent so remove the xid and prepareStatus. Records marked
+ // as removing are removed entirely.
+ openRs();
+ MessageMap m;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&m);
+ for (; !rs->EndOfFile; rs->MoveNext()) {
+ if (m.xidStatus != adFldOK)
+ continue;
+ const std::string x(m.xid, m.xidLength);
+ if (x != xid)
+ continue;
+ if (m.prepareStatus == PREPARE_REMOVE) {
+ rs->Delete(adAffectCurrent);
+ }
+ else {
+ _variant_t dbNull;
+ dbNull.ChangeType(VT_NULL);
+ rs->Fields->GetItem("prepareStatus")->Value = dbNull;
+ rs->Fields->GetItem("xid")->Value = dbNull;
+ }
+ rs->Update();
+ }
+ piAdoRecordBinding->Release();
+}
+
+void
+MessageMapRecordset::abortPrepared(const std::string& xid)
+{
+ // Find all the records for the specified xid. Records marked as adding
+ // need to be removed while records marked as removing are put back to
+ // no xid and no prepareStatus.
+ openRs();
+ MessageMap m;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&m);
+ for (; !rs->EndOfFile; rs->MoveNext()) {
+ if (m.xidStatus != adFldOK)
+ continue;
+ const std::string x(m.xid, m.xidLength);
+ if (x != xid)
+ continue;
+ if (m.prepareStatus == PREPARE_ADD) {
+ rs->Delete(adAffectCurrent);
+ }
+ else {
+ _variant_t dbNull;
+ dbNull.ChangeType(VT_NULL);
+ rs->Fields->GetItem("prepareStatus")->Value = dbNull;
+ rs->Fields->GetItem("xid")->Value = dbNull;
+ }
+ rs->Update();
+ }
+ piAdoRecordBinding->Release();
+}
+
+void
+MessageMapRecordset::recover(MessageQueueMap& msgMap)
+{
+ openRs();
+ if (rs->BOF && rs->EndOfFile)
+ return; // Nothing to do
+ rs->MoveFirst();
+ MessageMap b;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&b);
+ while (!rs->EndOfFile) {
+ qpid::store::QueueEntry entry(b.queueId);
+ if (b.xidStatus == adFldOK && b.xidLength > 0) {
+ entry.xid.assign(b.xid, b.xidLength);
+ entry.tplStatus =
+ b.prepareStatus == PREPARE_ADD ? QueueEntry::ADDING
+ : QueueEntry::REMOVING;
+ }
+ else {
+ entry.tplStatus = QueueEntry::NONE;
+ }
+ msgMap[b.messageId].push_back(entry);
+ rs->MoveNext();
+ }
+
+ piAdoRecordBinding->Release();
+}
+
+void
+MessageMapRecordset::dump()
+{
+ openRs();
+ Recordset::dump();
+ if (rs->EndOfFile && rs->BOF) // No records
+ return;
+ rs->MoveFirst();
+
+ MessageMap m;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&m);
+
+ while (!rs->EndOfFile) {
+ QPID_LOG(notice, "msg " << m.messageId << " on queue " << m.queueId);
+ rs->MoveNext();
+ }
+
+ piAdoRecordBinding->Release();
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
new file mode 100644
index 0000000000..1b0c2f073e
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
@@ -0,0 +1,100 @@
+#ifndef QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H
+#define QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <icrsint.h>
+#include "Recordset.h"
+#include <qpid/broker/RecoveryManager.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class MessageMapRecordset
+ *
+ * Class for the message map (message -> queue) records.
+ */
+class MessageMapRecordset : public Recordset {
+
+ // These values are defined in a constraint on the tblMessageMap table.
+ // the prepareStatus column can only be null, 1, or 2.
+ enum { PREPARE_ADD=1, PREPARE_REMOVE=2 };
+
+ class MessageMap : public CADORecordBinding {
+ BEGIN_ADO_BINDING(MessageMap)
+ ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE)
+ ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE)
+ ADO_FIXED_LENGTH_ENTRY2(3, adTinyInt, prepareStatus, FALSE)
+ ADO_VARIABLE_LENGTH_ENTRY(4, adVarBinary, xid, sizeof(xid),
+ xidStatus, xidLength, FALSE)
+ END_ADO_BINDING()
+
+ public:
+ uint64_t messageId;
+ uint64_t queueId;
+ uint8_t prepareStatus;
+ char xid[512];
+ int xidStatus;
+ uint32_t xidLength;
+ };
+
+ void selectOnXid(const std::string& xid);
+
+public:
+ virtual void open(DatabaseConnection* conn, const std::string& table);
+
+ // Add a new mapping
+ void add(uint64_t messageId,
+ uint64_t queueId,
+ const std::string& xid = "");
+
+ // Remove a specific mapping.
+ void remove(uint64_t messageId, uint64_t queueId);
+
+ // Mark the indicated message->queue entry pending removal. The entry
+ // for the mapping is updated to indicate pending removal with the
+ // specified xid.
+ void pendingRemove(uint64_t messageId,
+ uint64_t queueId,
+ const std::string& xid);
+
+ // Remove mappings for all messages on a specified queue.
+ void removeForQueue(uint64_t queueId);
+
+ // Commit records recorded as prepared.
+ void commitPrepared(const std::string& xid);
+
+ // Abort prepared changes.
+ void abortPrepared(const std::string& xid);
+
+ // Recover the mappings of message ID -> vector<queue ID>.
+ void recover(MessageQueueMap& msgMap);
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
new file mode 100644
index 0000000000..495f1a08c2
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "MessageRecordset.h"
+#include "BlobAdapter.h"
+#include "BlobEncoder.h"
+#include "VariantHelper.h"
+
+#include <boost/intrusive_ptr.hpp>
+
+class qpid::broker::PersistableMessage;
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+MessageRecordset::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+{
+ BlobEncoder blob (msg); // Marshall headers and content to a blob
+ rs->AddNew();
+ rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+ rs->Update();
+ uint64_t id = rs->Fields->Item["persistenceId"]->Value;
+ msg->setPersistenceId(id);
+}
+
+void
+MessageRecordset::append(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ const std::string& data)
+{
+ // Look up the message by its Id
+ std::ostringstream filter;
+ filter << "persistenceId = " << msg->getPersistenceId() << std::ends;
+ rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ if (rs->RecordCount == 0) {
+ throw Exception("Can't append to message not stored in database");
+ }
+ BlobEncoder blob (data); // Marshall string data to a blob
+ rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+ rs->Update();
+}
+
+void
+MessageRecordset::remove(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg)
+{
+ BlobRecordset::remove(msg->getPersistenceId());
+}
+
+void
+MessageRecordset::loadContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+ // Look up the message by its Id
+ std::ostringstream filter;
+ filter << "persistenceId = " << msg->getPersistenceId() << std::ends;
+ rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ if (rs->RecordCount == 0) {
+ throw Exception("Can't load message not stored in database");
+ }
+
+ // NOTE! If this code needs to change, please verify the encoding
+ // code in BlobEncoder.
+ long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
+ uint32_t headerSize;
+ const size_t headerFieldLength = sizeof(headerSize);
+ BlobAdapter blob(headerFieldLength);
+ blob =
+ rs->Fields->Item["fieldTableBlob"]->GetChunk((long)headerFieldLength);
+ headerSize = ((qpid::framing::Buffer&)blob).getLong();
+
+ // GetChunk always begins reading where the previous GetChunk left off,
+ // so we can't just tell it to ignore the header and read the data.
+ // So, read the header plus the offset, plus the desired data, then
+ // copy the desired data to the supplied string. If this ends up asking
+ // for more than is available in the field, reduce it to what's there.
+ long getSize = headerSize + offset + length;
+ if (getSize + (long)headerFieldLength > blobSize) {
+ size_t reduce = (getSize + headerFieldLength) - blobSize;
+ getSize -= reduce;
+ length -= reduce;
+ }
+ BlobAdapter header_plus(getSize);
+ header_plus = rs->Fields->Item["fieldTableBlob"]->GetChunk(getSize);
+ uint8_t *throw_away = new uint8_t[headerSize + offset];
+ ((qpid::framing::Buffer&)header_plus).getRawData(throw_away, headerSize + offset);
+ delete throw_away;
+ ((qpid::framing::Buffer&)header_plus).getRawData(data, length);
+}
+
+void
+MessageRecordset::recover(qpid::broker::RecoveryManager& recoverer,
+ std::map<uint64_t, broker::RecoverableMessage::shared_ptr>& messageMap)
+{
+ if (rs->BOF && rs->EndOfFile)
+ return; // Nothing to do
+ rs->MoveFirst();
+ Binding b;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&b);
+ while (!rs->EndOfFile) {
+ // The blob was written as normal, but with the header length
+ // prepended in a uint32_t. Due to message staging threshold
+ // limits, the header may be all that's read in; get it first,
+ // recover that message header, then see if the rest is needed.
+ //
+ // NOTE! If this code needs to change, please verify the encoding
+ // code in BlobEncoder.
+ long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
+ uint32_t headerSize;
+ const size_t headerFieldLength = sizeof(headerSize);
+ BlobAdapter blob(headerFieldLength);
+ blob =
+ rs->Fields->Item["fieldTableBlob"]->GetChunk((long)headerFieldLength);
+ headerSize = ((qpid::framing::Buffer&)blob).getLong();
+ BlobAdapter header(headerSize);
+ header = rs->Fields->Item["fieldTableBlob"]->GetChunk(headerSize);
+ broker::RecoverableMessage::shared_ptr msg;
+ msg = recoverer.recoverMessage(header);
+ msg->setPersistenceId(b.messageId);
+ messageMap[b.messageId] = msg;
+
+ // Now, do we need the rest of the content?
+ long contentLength = blobSize - headerFieldLength - headerSize;
+ if (contentLength > 0 && msg->loadContent(contentLength)) {
+ BlobAdapter content(contentLength);
+ content =
+ rs->Fields->Item["fieldTableBlob"]->GetChunk(contentLength);
+ msg->decodeContent(content);
+ }
+ rs->MoveNext();
+ }
+
+ piAdoRecordBinding->Release();
+}
+
+void
+MessageRecordset::dump()
+{
+ Recordset::dump();
+ if (rs->EndOfFile && rs->BOF) // No records
+ return;
+ rs->MoveFirst();
+
+ Binding b;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&b);
+
+ while (VARIANT_FALSE == rs->EndOfFile) {
+ QPID_LOG(notice, "Msg " << b.messageId);
+ rs->MoveNext();
+ }
+
+ piAdoRecordBinding->Release();
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h
new file mode 100644
index 0000000000..698b2561fe
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h
@@ -0,0 +1,85 @@
+#ifndef QPID_STORE_MSSQL_MESSAGERECORDSET_H
+#define QPID_STORE_MSSQL_MESSAGERECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <icrsint.h>
+#include "BlobRecordset.h"
+#include <qpid/broker/PersistableMessage.h>
+#include <qpid/broker/RecoveryManager.h>
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class MessageRecordset
+ *
+ * Class for storing and recovering messages. Messages are primarily blobs
+ * and handled similarly. However, messages larger than the staging threshold
+ * are not contained completely in memory; they're left mostly in the store
+ * and the header is held in memory. So when the message "blob" is saved,
+ * an additional size-of-the-header field is prepended to the blob.
+ * On recovery, the size-of-the-header is used to get only what's needed
+ * until it's determined if the entire message is to be recovered to memory.
+ */
+class MessageRecordset : public BlobRecordset {
+ class Binding : public CADORecordBinding {
+ BEGIN_ADO_BINDING(Binding)
+ ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE)
+ END_ADO_BINDING()
+
+ public:
+ uint64_t messageId;
+ };
+
+public:
+ // Store a message. Store the header size (4 bytes) then the regular
+ // blob comprising the message.
+ void add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
+ // Append additional content to an existing message.
+ void append(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ const std::string& data);
+
+ // Remove an existing message
+ void remove(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg);
+
+ // Load all or part of a stored message. This skips the header parts and
+ // loads content.
+ void loadContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
+ // Recover messages and save a map of those recovered.
+ void recover(qpid::broker::RecoveryManager& recoverer,
+ std::map<uint64_t, broker::RecoverableMessage::shared_ptr>& messageMap);
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_MESSAGERECORDSET_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp
new file mode 100644
index 0000000000..e706799951
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "Recordset.h"
+#include "BlobEncoder.h"
+#include "DatabaseConnection.h"
+#include "VariantHelper.h"
+
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+
+void
+Recordset::init(DatabaseConnection* conn, const std::string& table)
+{
+ dbConn = conn;
+ TESTHR(rs.CreateInstance(__uuidof(::Recordset)));
+ tableName = table;
+}
+
+void
+Recordset::openRs()
+{
+ // Client-side cursors needed to get access to newly added
+ // identity column immediately. Recordsets need this to get the
+ // persistence ID for the broker objects.
+ rs->CursorLocation = adUseClient;
+ _ConnectionPtr p = *dbConn;
+ rs->Open(tableName.c_str(),
+ _variant_t((IDispatch *)p, true),
+ adOpenStatic,
+ adLockOptimistic,
+ adCmdTable);
+}
+
+void
+Recordset::open(DatabaseConnection* conn, const std::string& table)
+{
+ init(conn, table);
+ openRs();
+}
+
+void
+Recordset::close()
+{
+ if (rs && rs->State == adStateOpen)
+ rs->Close();
+}
+
+void
+Recordset::requery()
+{
+ // Restore the recordset to reflect all current records.
+ rs->Filter = "";
+ rs->Requery(-1);
+}
+
+void
+Recordset::dump()
+{
+ long count = rs->RecordCount;
+ QPID_LOG(notice, "DB Dump: " + tableName <<
+ ": " << count << " records");
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/Recordset.h b/qpid/cpp/src/qpid/store/ms-sql/Recordset.h
new file mode 100644
index 0000000000..032b2bd434
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/Recordset.h
@@ -0,0 +1,75 @@
+#ifndef QPID_STORE_MSSQL_RECORDSET_H
+#define QPID_STORE_MSSQL_RECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
+#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
+ no_namespace rename("EOF", "EndOfFile")
+#include <comdef.h>
+#include <comutil.h>
+#include <string>
+#if 0
+#include <utility>
+#endif
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @class Recordset
+ *
+ * Represents an ADO Recordset, abstracting out the common operations needed
+ * on the common tables used that have 2 fields, persistence ID and blob.
+ */
+class Recordset {
+protected:
+ _RecordsetPtr rs;
+ DatabaseConnection* dbConn;
+ std::string tableName;
+
+ void init(DatabaseConnection* conn, const std::string& table);
+ void openRs();
+
+public:
+ Recordset() : rs(0), dbConn(0) {}
+ virtual ~Recordset() { close(); rs = 0; dbConn = 0; }
+
+ /**
+ * Default open() reads all records into the recordset.
+ */
+ virtual void open(DatabaseConnection* conn, const std::string& table);
+ void close();
+ void requery();
+ operator _RecordsetPtr () { return rs; }
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_RECORDSET_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp b/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp
new file mode 100644
index 0000000000..6ad7725570
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SqlTransaction.h"
+#include "DatabaseConnection.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+SqlTransaction::SqlTransaction(const boost::shared_ptr<DatabaseConnection>& _db)
+ : db(_db), transDepth(0)
+{
+}
+
+SqlTransaction::~SqlTransaction()
+{
+ if (transDepth > 0)
+ this->abort();
+}
+
+void
+SqlTransaction::begin()
+{
+ _bstr_t beginCmd("BEGIN TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(beginCmd, NULL, adExecuteNoRecords);
+ ++transDepth;
+}
+
+void
+SqlTransaction::commit()
+{
+ if (transDepth > 0) {
+ _bstr_t commitCmd("COMMIT TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(commitCmd, NULL, adExecuteNoRecords);
+ --transDepth;
+ }
+}
+
+void
+SqlTransaction::abort()
+{
+ if (transDepth > 0) {
+ _bstr_t rollbackCmd("ROLLBACK TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(rollbackCmd, NULL, adExecuteNoRecords);
+ transDepth = 0;
+ }
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.h b/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.h
new file mode 100644
index 0000000000..8b5239b786
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.h
@@ -0,0 +1,67 @@
+#ifndef QPID_STORE_MSSQL_SQLTRANSACTION_H
+#define QPID_STORE_MSSQL_SQLTRANSACTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <string>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @class SqlTransaction
+ *
+ * Class representing an SQL transaction.
+ * Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+ * et al, nested transactions are carried out with direct SQL commands.
+ * To ensure the state of this is known, keep track of how deeply the
+ * transactions are nested. This is more of a safety/sanity check since
+ * AMQP doesn't provide nested transactions.
+ */
+class SqlTransaction {
+
+ boost::shared_ptr<DatabaseConnection> db;
+
+ // Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+ // et al, nested transactions are carried out with direct SQL commands.
+ // To ensure the state of this is known, keep track of how deeply the
+ // transactions are nested.
+ unsigned int transDepth;
+
+public:
+ SqlTransaction(const boost::shared_ptr<DatabaseConnection>& _db);
+ ~SqlTransaction();
+
+ DatabaseConnection *dbConn() { return db.get(); }
+
+ void begin();
+ void commit();
+ void abort();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_SQLTRANSACTION_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/State.cpp b/qpid/cpp/src/qpid/store/ms-sql/State.cpp
new file mode 100644
index 0000000000..720603dd11
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/State.cpp
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "State.h"
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include <comdef.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+State::State() : dbConn(0)
+{
+ HRESULT hr = ::CoInitializeEx(NULL, COINIT_MULTITHREADED);
+ if (hr != S_OK && hr != S_FALSE)
+ throw Exception("Error initializing COM");
+}
+
+State::~State()
+{
+ if (dbConn)
+ delete dbConn;
+ ::CoUninitialize();
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/State.h b/qpid/cpp/src/qpid/store/ms-sql/State.h
new file mode 100644
index 0000000000..6350bc5bd2
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/State.h
@@ -0,0 +1,52 @@
+#ifndef QPID_STORE_MSSQL_STATE_H
+#define QPID_STORE_MSSQL_STATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @struct State
+ *
+ * Represents a thread's state for accessing ADO and the database.
+ * Creating an instance of State initializes COM for this thread, and
+ * destroying it uninitializes COM. There's also a DatabaseConnection
+ * for this thread's default access to the database. More DatabaseConnections
+ * can always be created, but State has one that can always be used by
+ * the thread whose state is represented.
+ *
+ * This class is intended to be one-per-thread, so it should be accessed
+ * via thread-specific storage.
+ */
+struct State {
+ State();
+ ~State();
+ DatabaseConnection *dbConn;
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_STATE_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.cpp
new file mode 100644
index 0000000000..1309d921a9
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.cpp
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "TplRecordset.h"
+#include "BlobEncoder.h"
+#include "DatabaseConnection.h"
+#include "VariantHelper.h"
+
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+TplRecordset::open(DatabaseConnection* conn, const std::string& table)
+{
+ init(conn, table);
+ // Don't actually open until we know what to do. It's far easier and more
+ // efficient to simply do most of these TPL/xid ops in a single statement.
+}
+
+void
+TplRecordset::add(const std::string& xid)
+{
+ const std::string command =
+ "INSERT INTO " + tableName + " ( xid ) VALUES ( ? )";
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.c_str();
+ cmd->CommandType = adCmdText;
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+TplRecordset::remove(const std::string& xid)
+{
+ // Look up the item by its xid
+ const std::string command =
+ "DELETE FROM " + tableName + " WHERE xid = ?";
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.c_str();
+ cmd->CommandType = adCmdText;
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ _variant_t deletedRecords;
+ cmd->Execute(&deletedRecords, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+TplRecordset::recover(std::set<std::string>& xids)
+{
+ openRs();
+ if (rs->BOF && rs->EndOfFile)
+ return; // Nothing to do
+ rs->MoveFirst();
+ while (!rs->EndOfFile) {
+ _variant_t wxid = rs->Fields->Item["xid"]->Value;
+ char *xidBytes;
+ SafeArrayAccessData(wxid.parray, (void **)&xidBytes);
+ std::string xid(xidBytes, rs->Fields->Item["xid"]->ActualSize);
+ xids.insert(xid);
+ SafeArrayUnaccessData(wxid.parray);
+ rs->MoveNext();
+ }
+}
+
+void
+TplRecordset::dump()
+{
+ Recordset::dump();
+ if (rs->EndOfFile && rs->BOF) // No records
+ return;
+
+ rs->MoveFirst();
+ while (!rs->EndOfFile) {
+ _bstr_t wxid = rs->Fields->Item["xid"]->Value;
+ QPID_LOG(notice, " -> " << (const char *)wxid);
+ rs->MoveNext();
+ }
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.h
new file mode 100644
index 0000000000..fbde51738c
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.h
@@ -0,0 +1,58 @@
+#ifndef QPID_STORE_MSSQL_TPLRECORDSET_H
+#define QPID_STORE_MSSQL_TPLRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Recordset.h"
+#include <string>
+#include <set>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class TplRecordset
+ *
+ * Class for the TPL (Transaction Prepared List) records.
+ */
+class TplRecordset : public Recordset {
+protected:
+
+public:
+ virtual void open(DatabaseConnection* conn, const std::string& table);
+
+ void add(const std::string& xid);
+
+ // Remove a record given its xid.
+ void remove(const std::string& xid);
+
+ // Recover prepared transaction XIDs.
+ void recover(std::set<std::string>& xids);
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_TPLRECORDSET_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp
new file mode 100644
index 0000000000..acec95c1f9
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "VariantHelper.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+template <class Wrapped>
+VariantHelper<Wrapped>::VariantHelper()
+{
+ var.vt = VT_EMPTY;
+}
+
+template <class Wrapped>
+VariantHelper<Wrapped>::operator const _variant_t& () const
+{
+ return var;
+}
+
+// Specialization for using _variant_t to wrap a std::string
+VariantHelper<std::string>::VariantHelper(const std::string &init)
+{
+ if (init.empty() || init.length() == 0) {
+ var.vt = VT_BSTR;
+ var.bstrVal = NULL;
+ }
+ else {
+ var.SetString(init.c_str());
+ }
+}
+
+VariantHelper<std::string>&
+VariantHelper<std::string>::operator=(const std::string &rhs)
+{
+ if (rhs.empty() || rhs.length() == 0) {
+ var.vt = VT_BSTR;
+ var.bstrVal = NULL;
+ }
+ else {
+ var.SetString(rhs.c_str());
+ }
+ return *this;
+}
+
+VariantHelper<std::string>::operator const _variant_t& () const
+{
+ return var;
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h
new file mode 100644
index 0000000000..723dbc3b76
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h
@@ -0,0 +1,61 @@
+#ifndef QPID_STORE_MSSQL_VARIANTHELPER_H
+#define QPID_STORE_MSSQL_VARIANTHELPER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <comutil.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class VariantHelper
+ *
+ * Class template to wrap the details of working with _variant_t objects.
+ */
+template <class Wrapped> class VariantHelper {
+private:
+ _variant_t var;
+
+public:
+ VariantHelper();
+ VariantHelper(const Wrapped &init);
+
+ VariantHelper& operator =(const Wrapped& rhs);
+ operator const _variant_t& () const;
+};
+
+// Specialization for using _variant_t to wrap a std::string
+template<> class VariantHelper<std::string> {
+private:
+ _variant_t var;
+
+public:
+ VariantHelper(const std::string &init);
+ VariantHelper& operator =(const std::string& rhs);
+ operator const _variant_t& () const;
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_VARIANTHELPER_H */