diff options
Diffstat (limited to 'qpid/cpp/src/qpid/store')
45 files changed, 8258 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/store/CMakeLists.txt b/qpid/cpp/src/qpid/store/CMakeLists.txt new file mode 100644 index 0000000000..ee7894730a --- /dev/null +++ b/qpid/cpp/src/qpid/store/CMakeLists.txt @@ -0,0 +1,120 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +project(qpidc_store) + +#set (CMAKE_VERBOSE_MAKEFILE ON) # for debugging + +include_directories( ${Boost_INCLUDE_DIR} ) + +include_directories( ${CMAKE_CURRENT_SOURCE_DIR} ) +include_directories( ${CMAKE_HOME_DIRECTORY}/include ) + +set (store_SOURCES + MessageStorePlugin.cpp + ) +add_library (store MODULE ${store_SOURCES}) +target_link_libraries (store qpidbroker qpidcommon) +if (CMAKE_COMPILER_IS_GNUCXX) + set (GCC_CATCH_UNDEFINED "-Wl,--no-undefined") + # gcc on SunOS uses native linker whose "-z defs" is too fussy + if (CMAKE_SYSTEM_NAME STREQUAL SunOS) + set (GCC_CATCH_UNDEFINED "") + endif (CMAKE_SYSTEM_NAME STREQUAL SunOS) + + set_target_properties (store PROPERTIES + PREFIX "" + COMPILE_DEFINITIONS _IN_QPID_BROKER + LINK_FLAGS "${GCC_CATCH_UNDEFINED}") +endif (CMAKE_COMPILER_IS_GNUCXX) + +if (CMAKE_SYSTEM_NAME STREQUAL Windows) + if (MSVC) + add_definitions( + /D "NOMINMAX" + /D "WIN32_LEAN_AND_MEAN" + ) + endif (MSVC) +endif (CMAKE_SYSTEM_NAME STREQUAL Windows) + +set_target_properties (store PROPERTIES + COMPILE_DEFINITIONS _IN_QPID_BROKER + VERSION ${qpidc_version}) +install (TARGETS store # RUNTIME + DESTINATION ${QPIDD_MODULE_DIR} + COMPONENT ${QPID_COMPONENT_BROKER}) + +# Build the MS SQL Storage Provider plugin +set (mssql_default ON) +if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) + set(mssql_default OFF) +endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) +option(BUILD_MSSQL "Build MS SQL Store provider plugin" ${mssql_default}) +if (BUILD_MSSQL) + add_library (mssql_store MODULE + ms-sql/MsSqlProvider.cpp + ms-sql/AmqpTransaction.cpp + ms-sql/BindingRecordset.cpp + ms-sql/BlobAdapter.cpp + ms-sql/BlobEncoder.cpp + ms-sql/BlobRecordset.cpp + ms-sql/DatabaseConnection.cpp + ms-sql/MessageMapRecordset.cpp + ms-sql/MessageRecordset.cpp + ms-sql/Recordset.cpp + ms-sql/SqlTransaction.cpp + ms-sql/State.cpp + ms-sql/TplRecordset.cpp + ms-sql/VariantHelper.cpp) + set_target_properties (mssql_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER) + target_link_libraries (mssql_store qpidbroker qpidcommon) + install (TARGETS mssql_store # RUNTIME + DESTINATION ${QPIDD_MODULE_DIR} + COMPONENT ${QPID_COMPONENT_BROKER}) +endif (BUILD_MSSQL) + +# Build the MS SQL-CLFS Storage Provider plugin +set (msclfs_default ON) +if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) + set(msclfs_default OFF) +endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) +option(BUILD_MSCLFS "Build MS hybrid SQL-CLFS Store provider plugin" ${msclfs_default}) +if (BUILD_MSCLFS) + add_library (msclfs_store MODULE + ms-clfs/MsSqlClfsProvider.cpp + ms-clfs/Log.cpp + ms-clfs/MessageLog.cpp + ms-clfs/Messages.cpp + ms-clfs/Transaction.cpp + ms-clfs/TransactionLog.cpp + ms-sql/BindingRecordset.cpp + ms-sql/BlobAdapter.cpp + ms-sql/BlobEncoder.cpp + ms-sql/BlobRecordset.cpp + ms-sql/DatabaseConnection.cpp + ms-sql/Recordset.cpp + ms-sql/State.cpp + ms-sql/VariantHelper.cpp) + include_directories(ms-sql) + set_target_properties (msclfs_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER) + target_link_libraries (msclfs_store qpidbroker qpidcommon clfsw32.lib) + install (TARGETS msclfs_store # RUNTIME + DESTINATION ${QPIDD_MODULE_DIR} + COMPONENT ${QPID_COMPONENT_BROKER}) +endif (BUILD_MSCLFS) diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp new file mode 100644 index 0000000000..b876bd6b6d --- /dev/null +++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp @@ -0,0 +1,463 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "MessageStorePlugin.h" +#include "StorageProvider.h" +#include "StoreException.h" +#include "qpid/broker/Broker.h" +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include "qpid/DataDir.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace store { + +/* + * The MessageStore pointer given to the Broker points to static storage. + * Thus, it cannot be deleted, especially by the broker. To prevent deletion, + * this no-op deleter is used with the boost::shared_ptr. When the last + * shared_ptr is destroyed, the deleter is called rather than delete(). + */ +namespace { + class NoopDeleter { + public: + NoopDeleter() {} + void operator()(qpid::broker::MessageStore * /*p*/) {} + }; +} + +static MessageStorePlugin static_instance_registers_plugin; + + +MessageStorePlugin::StoreOptions::StoreOptions(const std::string& name) : + qpid::Options(name) +{ + addOptions() + ("storage-provider", qpid::optValue(providerName, "PROVIDER"), + "Name of the storage provider to use.") + ; +} + + +void +MessageStorePlugin::earlyInitialize (qpid::Plugin::Target& target) +{ + qpid::broker::Broker* b = + dynamic_cast<qpid::broker::Broker*>(&target); + if (0 == b) + return; // Only listen to Broker targets + + broker = b; + + // See if there are any storage provider plugins ready. If not, we can't + // do a message store. + qpid::Plugin::earlyInitAll(*this); + + if (providers.empty()) { + QPID_LOG(warning, + "Message store plugin: No storage providers available."); + provider = providers.end(); + return; + } + if (!options.providerName.empty()) { + // If specific one was chosen, locate it in loaded set of providers. + provider = providers.find(options.providerName); + if (provider == providers.end()) + throw Exception("Message store plugin: storage provider '" + + options.providerName + + "' does not exist."); + } + else { + // No specific provider chosen; if there's only one, use it. Else + // report the need to pick one. + if (providers.size() > 1) { + provider = providers.end(); + throw Exception("Message store plugin: multiple provider plugins " + "loaded; must either load only one or select one " + "using --storage-provider"); + } + provider = providers.begin(); + } + + provider->second->activate(*this); + NoopDeleter d; + boost::shared_ptr<qpid::broker::MessageStore> sp(this, d); + broker->setStore(sp); + target.addFinalizer(boost::bind(&MessageStorePlugin::finalizeMe, this)); +} + +void +MessageStorePlugin::initialize(qpid::Plugin::Target& target) +{ + qpid::broker::Broker* broker = + dynamic_cast<qpid::broker::Broker*>(&target); + if (0 == broker) + return; // Only listen to Broker targets + + // Pass along the initialize step to the provider that's activated. + if (provider != providers.end()) { + provider->second->initialize(*this); + } + // qpid::Plugin::initializeAll(*this); +} + +void +MessageStorePlugin::finalizeMe() +{ + finalize(); // Call finalizers on any Provider plugins +} + +void +MessageStorePlugin::providerAvailable(const std::string name, + StorageProvider *be) +{ + ProviderMap::value_type newSp(name, be); + std::pair<ProviderMap::iterator, bool> inserted = providers.insert(newSp); + if (inserted.second == false) + QPID_LOG(warning, "Storage provider " << name << " duplicate; ignored."); +} + + +/** + * Record the existence of a durable queue + */ +void +MessageStorePlugin::create(broker::PersistableQueue& queue, + const framing::FieldTable& args) +{ + if (queue.getName().size() == 0) + { + QPID_LOG(error, + "Cannot create store for empty (null) queue name - " + "ignoring and attempting to continue."); + return; + } + if (queue.getPersistenceId()) { + THROW_STORE_EXCEPTION("Queue already created: " + queue.getName()); + } + provider->second->create(queue, args); +} + +/** + * Destroy a durable queue + */ +void +MessageStorePlugin::destroy(broker::PersistableQueue& queue) +{ + provider->second->destroy(queue); +} + +/** + * Record the existence of a durable exchange + */ +void +MessageStorePlugin::create(const broker::PersistableExchange& exchange, + const framing::FieldTable& args) +{ + if (exchange.getPersistenceId()) { + THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName()); + } + provider->second->create(exchange, args); +} + +/** + * Destroy a durable exchange + */ +void +MessageStorePlugin::destroy(const broker::PersistableExchange& exchange) +{ + provider->second->destroy(exchange); +} + +/** + * Record a binding + */ +void +MessageStorePlugin::bind(const broker::PersistableExchange& exchange, + const broker::PersistableQueue& queue, + const std::string& key, + const framing::FieldTable& args) +{ + provider->second->bind(exchange, queue, key, args); +} + +/** + * Forget a binding + */ +void +MessageStorePlugin::unbind(const broker::PersistableExchange& exchange, + const broker::PersistableQueue& queue, + const std::string& key, + const framing::FieldTable& args) +{ + provider->second->unbind(exchange, queue, key, args); +} + +/** + * Record generic durable configuration + */ +void +MessageStorePlugin::create(const broker::PersistableConfig& config) +{ + if (config.getPersistenceId()) { + THROW_STORE_EXCEPTION("Config item already created: " + + config.getName()); + } + provider->second->create(config); +} + +/** + * Destroy generic durable configuration + */ +void +MessageStorePlugin::destroy(const broker::PersistableConfig& config) +{ + provider->second->destroy(config); +} + +/** + * Stores a message before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). + */ +void +MessageStorePlugin::stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg) +{ + if (msg->getPersistenceId() == 0) { + provider->second->stage(msg); + } +} + +/** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ +void +MessageStorePlugin::destroy(broker::PersistableMessage& msg) +{ + if (msg.getPersistenceId()) + provider->second->destroy(msg); +} + +/** + * Appends content to a previously staged message + */ +void +MessageStorePlugin::appendContent + (const boost::intrusive_ptr<const broker::PersistableMessage>& msg, + const std::string& data) +{ + if (msg->getPersistenceId()) + provider->second->appendContent(msg, data); + else + THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!"); +} + +/** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ +void +MessageStorePlugin::loadContent(const broker::PersistableQueue& queue, + const boost::intrusive_ptr<const broker::PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length) +{ + if (msg->getPersistenceId()) + provider->second->loadContent(queue, msg, data, offset, length); + else + THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!"); +} + +/** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * Note: The operation is asynchronous so the return of this function does + * not mean the operation is complete. + */ +void +MessageStorePlugin::enqueue(broker::TransactionContext* ctxt, + const boost::intrusive_ptr<broker::PersistableMessage>& msg, + const broker::PersistableQueue& queue) +{ + if (queue.getPersistenceId() == 0) { + THROW_STORE_EXCEPTION("Queue not created: " + queue.getName()); + } + provider->second->enqueue(ctxt, msg, queue); +} + +/** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * Note: The operation is asynchronous so the return of this function does + * not mean the operation is complete. + */ +void +MessageStorePlugin::dequeue(broker::TransactionContext* ctxt, + const boost::intrusive_ptr<broker::PersistableMessage>& msg, + const broker::PersistableQueue& queue) +{ + provider->second->dequeue(ctxt, msg, queue); +} + +/** + * Flushes all async messages to disk for the specified queue + * + * Note: The operation is asynchronous so the return of this function does + * not mean the operation is complete. + */ +void +MessageStorePlugin::flush(const broker::PersistableQueue& queue) +{ + provider->second->flush(queue); +} + +/** + * Returns the number of outstanding AIO's for a given queue + * + * If 0, than all the enqueue / dequeues have been stored + * to disk. + */ +uint32_t +MessageStorePlugin::outstandingQueueAIO(const broker::PersistableQueue& queue) +{ + return provider->second->outstandingQueueAIO(queue); +} + +std::auto_ptr<broker::TransactionContext> +MessageStorePlugin::begin() +{ + return provider->second->begin(); +} + +std::auto_ptr<broker::TPCTransactionContext> +MessageStorePlugin::begin(const std::string& xid) +{ + return provider->second->begin(xid); +} + +void +MessageStorePlugin::prepare(broker::TPCTransactionContext& ctxt) +{ + provider->second->prepare(ctxt); +} + +void +MessageStorePlugin::commit(broker::TransactionContext& ctxt) +{ + provider->second->commit(ctxt); +} + +void +MessageStorePlugin::abort(broker::TransactionContext& ctxt) +{ + provider->second->abort(ctxt); +} + +void +MessageStorePlugin::collectPreparedXids(std::set<std::string>& xids) +{ + provider->second->collectPreparedXids(xids); +} + +/** + * Request recovery of queue and message state; inherited from Recoverable + */ +void +MessageStorePlugin::recover(broker::RecoveryManager& recoverer) +{ + ExchangeMap exchanges; + QueueMap queues; + MessageMap messages; + MessageQueueMap messageQueueMap; + std::vector<std::string> xids; + PreparedTransactionMap dtxMap; + + provider->second->recoverConfigs(recoverer); + provider->second->recoverExchanges(recoverer, exchanges); + provider->second->recoverQueues(recoverer, queues); + provider->second->recoverBindings(recoverer, exchanges, queues); + // Important to recover messages before transactions in the SQL-CLFS + // case. If this becomes a problem, it may be possible to resolve it. + // If in doubt please raise a jira and notify Steve Huston + // <shuston@riverace.com>. + provider->second->recoverMessages(recoverer, messages, messageQueueMap); + provider->second->recoverTransactions(recoverer, dtxMap); + // Enqueue msgs where needed. + for (MessageQueueMap::const_iterator i = messageQueueMap.begin(); + i != messageQueueMap.end(); + ++i) { + // Locate the message corresponding to the current message Id + MessageMap::const_iterator iMsg = messages.find(i->first); + if (iMsg == messages.end()) { + std::ostringstream oss; + oss << "No matching message trying to re-enqueue message " + << i->first; + THROW_STORE_EXCEPTION(oss.str()); + } + broker::RecoverableMessage::shared_ptr msg = iMsg->second; + // Now for each queue referenced in the queue map, locate it + // and re-enqueue the message. + for (std::vector<QueueEntry>::const_iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + // Locate the queue corresponding to the current queue Id + QueueMap::const_iterator iQ = queues.find(j->queueId); + if (iQ == queues.end()) { + std::ostringstream oss; + oss << "No matching queue trying to re-enqueue message " + << " on queue Id " << j->queueId; + THROW_STORE_EXCEPTION(oss.str()); + } + // Messages involved in prepared transactions have their status + // updated accordingly. First, though, restore a message that + // is expected to be on a queue, including non-transacted + // messages and those pending dequeue in a dtx. + if (j->tplStatus != QueueEntry::ADDING) + iQ->second->recover(msg); + switch(j->tplStatus) { + case QueueEntry::ADDING: + dtxMap[j->xid]->enqueue(iQ->second, msg); + break; + case QueueEntry::REMOVING: + dtxMap[j->xid]->dequeue(iQ->second, msg); + break; + default: + break; + } + } + } +} + +}} // namespace qpid::store diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.h b/qpid/cpp/src/qpid/store/MessageStorePlugin.h new file mode 100644 index 0000000000..5290fc16db --- /dev/null +++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.h @@ -0,0 +1,280 @@ +#ifndef QPID_STORE_MESSAGESTOREPLUGIN_H +#define QPID_STORE_MESSAGESTOREPLUGIN_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include "qpid/broker/MessageStore.h" +//#include "qpid/management/Manageable.h" + +#include <string> + +using namespace qpid; + +namespace qpid { + +namespace broker { +class Broker; +class PersistableExchange; +class PersistableMessage; +class PersistableQueue; +} + +namespace store { + +class StorageProvider; + +/** + * @class MessageStorePlugin + * + * MessageStorePlugin is the front end of the persistent message store + * plugin. It is responsible for coordinating recovery, initialization, + * transactions (both local and distributed), flow-to-disk loading and + * unloading and persisting broker state (queues, bindings etc.). + * Actual storage operations are carried out by a message store storage + * provider that implements the qpid::store::StorageProvider interface. + */ +class MessageStorePlugin : + public qpid::Plugin, + public qpid::broker::MessageStore, // Frontend classes + public qpid::Plugin::Target // Provider target + // @TODO Need a mgmt story for this. Maybe allow r/o access to provider store info? public qpid::management::Manageable +{ + public: + MessageStorePlugin() : broker(0) {} + + /** + * @name Methods inherited from qpid::Plugin + */ + //@{ + virtual Options* getOptions() { return &options; } + virtual void earlyInitialize (Plugin::Target& target); + virtual void initialize(Plugin::Target& target); + //@} + + /// Finalizer; calls Target::finalize() to run finalizers on + /// StorageProviders. + void finalizeMe(); + + /** + * Called by StorageProvider instances during the earlyInitialize sequence. + * Each StorageProvider must supply a unique name by which it is known and a + * pointer to itself. + */ + virtual void providerAvailable(const std::string name, StorageProvider *be); + + /** + * @name Methods inherited from qpid::broker::MessageStore + */ + + /** + * Record the existence of a durable queue + */ + virtual void create(broker::PersistableQueue& queue, + const framing::FieldTable& args); + /** + * Destroy a durable queue + */ + virtual void destroy(broker::PersistableQueue& queue); + + /** + * Record the existence of a durable exchange + */ + virtual void create(const broker::PersistableExchange& exchange, + const framing::FieldTable& args); + /** + * Destroy a durable exchange + */ + virtual void destroy(const broker::PersistableExchange& exchange); + + /** + * Record a binding + */ + virtual void bind(const broker::PersistableExchange& exchange, + const broker::PersistableQueue& queue, + const std::string& key, + const framing::FieldTable& args); + + /** + * Forget a binding + */ + virtual void unbind(const broker::PersistableExchange& exchange, + const broker::PersistableQueue& queue, + const std::string& key, + const framing::FieldTable& args); + + /** + * Record generic durable configuration + */ + virtual void create(const broker::PersistableConfig& config); + + /** + * Destroy generic durable configuration + */ + virtual void destroy(const broker::PersistableConfig& config); + + /** + * Stores a message before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ + virtual void stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg); + + /** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ + virtual void destroy(broker::PersistableMessage& msg); + + /** + * Appends content to a previously staged message + */ + virtual void appendContent(const boost::intrusive_ptr<const broker::PersistableMessage>& msg, + const std::string& data); + + /** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ + virtual void loadContent(const broker::PersistableQueue& queue, + const boost::intrusive_ptr<const broker::PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length); + + /** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * Note: The operation is asynchronous so the return of this function does + * not mean the operation is complete. + * + * @param msg the message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void enqueue(broker::TransactionContext* ctxt, + const boost::intrusive_ptr<broker::PersistableMessage>& msg, + const broker::PersistableQueue& queue); + + /** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * + * Note: The operation is asynchronous so the return of this function does + * not mean the operation is complete. + * + * @param msg the message to dequeue + * @param queue the name of the queue from which it is to be dequeued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void dequeue(broker::TransactionContext* ctxt, + const boost::intrusive_ptr<broker::PersistableMessage>& msg, + const broker::PersistableQueue& queue); + + /** + * Flushes all async messages to disk for the specified queue + * + * + * Note: The operation is asynchronous so the return of this function does + * not mean the operation is complete. + * + * @param queue the name of the queue from which it is to be dequeued + */ + virtual void flush(const broker::PersistableQueue& queue); + + /** + * Returns the number of outstanding AIO's for a given queue + * + * If 0, than all the enqueue / dequeues have been stored + * to disk + * + * @param queue the name of the queue to check for outstanding AIO + */ + virtual uint32_t outstandingQueueAIO(const broker::PersistableQueue& queue); + //@} + + /** + * @name Methods inherited from qpid::broker::TransactionalStore + */ + //@{ + std::auto_ptr<broker::TransactionContext> begin(); + + std::auto_ptr<broker::TPCTransactionContext> begin(const std::string& xid); + + void prepare(broker::TPCTransactionContext& ctxt); + + void commit(broker::TransactionContext& ctxt); + + void abort(broker::TransactionContext& ctxt); + + void collectPreparedXids(std::set<std::string>& xids); + //@} + + /** + * Request recovery of queue and message state; inherited from Recoverable + */ + virtual void recover(broker::RecoveryManager& recoverer); + + // inline management::Manageable::status_t ManagementMethod (uint32_t, management::Args&, std::string&) + // { return management::Manageable::STATUS_OK; } + + // So storage provider can get the broker info. + broker::Broker *getBroker() { return broker; } + + protected: + + struct StoreOptions : public qpid::Options { + StoreOptions(const std::string& name="Store Options"); + std::string providerName; + }; + StoreOptions options; + + typedef std::map<std::string, StorageProvider*> ProviderMap; + ProviderMap providers; + ProviderMap::const_iterator provider; + + broker::Broker *broker; + +}; // class MessageStoreImpl + +}} // namespace qpid::store + +#endif /* QPID_SERIALIZER_H */ diff --git a/qpid/cpp/src/qpid/store/StorageProvider.h b/qpid/cpp/src/qpid/store/StorageProvider.h new file mode 100644 index 0000000000..de12ffb869 --- /dev/null +++ b/qpid/cpp/src/qpid/store/StorageProvider.h @@ -0,0 +1,329 @@ +#ifndef QPID_STORE_STORAGEPROVIDER_H +#define QPID_STORE_STORAGEPROVIDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <map> +#include <stdexcept> +#include <vector> +#include "qpid/Exception.h" +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include "qpid/broker/MessageStore.h" + +using qpid::broker::PersistableConfig; +using qpid::broker::PersistableExchange; +using qpid::broker::PersistableMessage; +using qpid::broker::PersistableQueue; + +namespace qpid { +namespace store { + +typedef std::map<uint64_t, qpid::broker::RecoverableExchange::shared_ptr> + ExchangeMap; +typedef std::map<uint64_t, qpid::broker::RecoverableQueue::shared_ptr> + QueueMap; +typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr> + MessageMap; +// Msg Id -> vector of queue entries where message is queued +struct QueueEntry { + enum TplStatus { NONE = 0, ADDING = 1, REMOVING = 2 }; + uint64_t queueId; + TplStatus tplStatus; + std::string xid; + + QueueEntry(uint64_t id, TplStatus tpl = NONE, const std::string& x = "") + : queueId(id), tplStatus(tpl), xid(x) {} + + bool operator==(const QueueEntry& rhs) const { + if (queueId != rhs.queueId) return false; + if (tplStatus == NONE && rhs.tplStatus == NONE) return true; + return xid == rhs.xid; + } +}; +typedef std::map<uint64_t, std::vector<QueueEntry> > MessageQueueMap; +typedef std::map<std::string, qpid::broker::RecoverableTransaction::shared_ptr> + PreparedTransactionMap; + +class MessageStorePlugin; + +/** + * @class StorageProvider + * + * StorageProvider defines the interface for the storage provider plugin to the + * Qpid broker persistence store plugin. + * + * @TODO Should StorageProvider also inherit from MessageStore? If so, then + * maybe remove Recoverable from MessageStore's inheritance and move it + * to MessageStorePlugin? In any event, somehow the discardInit() feature + * needs to get added here. + */ +class StorageProvider : public qpid::Plugin, public qpid::broker::MessageStore +{ +public: + + class Exception : public qpid::Exception + { + public: + virtual ~Exception() throw() {} + virtual const char *what() const throw() = 0; + }; + + /** + * @name Methods inherited from qpid::Plugin + */ + //@{ + /** + * Return a pointer to the provider's options. The options will be + * updated during option parsing by the host program; therefore, the + * referenced Options object must remain valid past this function's return. + * + * @return An options group or 0 for no options. Default returns 0. + * Plugin retains ownership of return value. + */ + virtual qpid::Options* getOptions() = 0; + + /** + * Initialize Plugin functionality on a Target, called before + * initializing the target. + * + * StorageProviders should respond only to Targets of class + * qpid::store::MessageStorePlugin and ignore all others. + * + * When called, the provider should invoke the method + * qpid::store::MessageStorePlugin::providerAvailable() to alert the + * message store of StorageProvider's availability. + * + * Called before the target itself is initialized. + */ + virtual void earlyInitialize (Plugin::Target& target) = 0; + + /** + * Initialize StorageProvider functionality. Called after initializing + * the target. + * + * StorageProviders should respond only to Targets of class + * qpid::store::MessageStorePlugin and ignore all others. + * + * Called after the target is fully initialized. + */ + virtual void initialize(Plugin::Target& target) = 0; + //@} + + /** + * Receive notification that this provider is the one that will actively + * handle storage for the target. If the provider is to be used, this + * method will be called after earlyInitialize() and before any + * recovery operations (recovery, in turn, precedes call to initialize()). + * Thus, it is wise to not actually do any database ops from within + * earlyInitialize() - they can wait until activate() is called because + * at that point it is certain the database will be needed. + */ + virtual void activate(MessageStorePlugin &store) = 0; + + /** + * @name Methods inherited from qpid::broker::MessageStore + */ + + /** + * Record the existence of a durable queue + */ + virtual void create(PersistableQueue& queue, + const qpid::framing::FieldTable& args) = 0; + /** + * Destroy a durable queue + */ + virtual void destroy(PersistableQueue& queue) = 0; + + /** + * Record the existence of a durable exchange + */ + virtual void create(const PersistableExchange& exchange, + const qpid::framing::FieldTable& args) = 0; + /** + * Destroy a durable exchange + */ + virtual void destroy(const PersistableExchange& exchange) = 0; + + /** + * Record a binding + */ + virtual void bind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args) = 0; + + /** + * Forget a binding + */ + virtual void unbind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args) = 0; + + /** + * Record generic durable configuration + */ + virtual void create(const PersistableConfig& config) = 0; + + /** + * Destroy generic durable configuration + */ + virtual void destroy(const PersistableConfig& config) = 0; + + /** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ + virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg) = 0; + + /** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ + virtual void destroy(PersistableMessage& msg) = 0; + + /** + * Appends content to a previously staged message + */ + virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, + const std::string& data) = 0; + + /** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ + virtual void loadContent(const PersistableQueue& queue, + const boost::intrusive_ptr<const PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length) = 0; + + /** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param msg the message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void enqueue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) = 0; + + /** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param msg the message to dequeue + * @param queue the name of the queue from which it is to be dequeued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) = 0; + + /** + * Flushes all async messages to disk for the specified queue + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param queue the name of the queue from which it is to be dequeued + */ + virtual void flush(const qpid::broker::PersistableQueue& queue) = 0; + + /** + * Returns the number of outstanding AIO's for a given queue + * + * If 0, than all the enqueue / dequeues have been stored + * to disk + * + * @param queue the name of the queue to check for outstanding AIO + */ + virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) = 0; + //@} + + /** + * @TODO This should probably not be here - it's only here because + * MessageStore inherits from Recoverable... maybe move that derivation. + * + * As it is now, we don't use this. Separate recover methods are + * declared below for individual types, which also set up maps of + * messages, queues, transactions for the main store plugin to handle + * properly. + * + * Request recovery of queue and message state. + */ + virtual void recover(qpid::broker::RecoveryManager& /*recoverer*/) {} + + /** + * @name Methods that do the recovery of the various objects that + * were saved. + */ + //@{ + + /** + * Recover bindings. + */ + virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer) = 0; + virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer, + ExchangeMap& exchangeMap) = 0; + virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer, + QueueMap& queueMap) = 0; + virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer, + const ExchangeMap& exchangeMap, + const QueueMap& queueMap) = 0; + virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, + MessageMap& messageMap, + MessageQueueMap& messageQueueMap) = 0; + virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap) = 0; + //@} +}; + +}} // namespace qpid::store + +#endif /* QPID_STORE_STORAGEPROVIDER_H */ diff --git a/qpid/cpp/src/qpid/store/StoreException.h b/qpid/cpp/src/qpid/store/StoreException.h new file mode 100644 index 0000000000..1dc7f670ec --- /dev/null +++ b/qpid/cpp/src/qpid/store/StoreException.h @@ -0,0 +1,49 @@ +#ifndef QPID_STORE_STOREEXCEPTION_H +#define QPID_STORE_STOREEXCEPTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <exception> +#include <boost/format.hpp> +#include "StorageProvider.h" + +namespace qpid { +namespace store { + +class StoreException : public std::exception +{ + std::string text; +public: + StoreException(const std::string& _text) : text(_text) {} + StoreException(const std::string& _text, + const StorageProvider::Exception& cause) + : text(_text + ": " + cause.what()) {} + virtual ~StoreException() throw() {} + virtual const char* what() const throw() { return text.c_str(); } +}; + +#define THROW_STORE_EXCEPTION(MESSAGE) throw qpid::store::StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ % __LINE__)) +#define THROW_STORE_EXCEPTION_2(MESSAGE, EXCEPTION) throw qpid::store::StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ % __LINE__), EXCEPTION) + +}} // namespace qpid::store + +#endif /* QPID_STORE_STOREEXCEPTION_H */ diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp new file mode 100644 index 0000000000..e6cb10c133 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <windows.h> +#include <clfsw32.h> +#include <clfsmgmtw32.h> +#include <sstream> +#include <string> +#include <vector> +#include <stdlib.h> +#include <qpid/sys/windows/check.h> + +#include "Log.h" + +namespace qpid { +namespace store { +namespace ms_clfs { + +Log::~Log() +{ + if (marshal != 0) + ::DeleteLogMarshallingArea(marshal); + ::CloseHandle(handle); +} + +void +Log::open(const std::string& path, const TuningParameters& params) +{ + this->containerSize = static_cast<ULONGLONG>(params.containerSize); + logPath = path; + std::string logSpec = "log:" + path; + size_t specLength = logSpec.length(); + std::auto_ptr<wchar_t> wLogSpec(new wchar_t[specLength + 1]); + size_t converted; + mbstowcs_s(&converted, + wLogSpec.get(), specLength+1, + logSpec.c_str(), specLength); + handle = ::CreateLogFile(wLogSpec.get(), + GENERIC_WRITE | GENERIC_READ, + 0, + 0, + OPEN_ALWAYS, + 0); + QPID_WINDOWS_CHECK_NOT(handle, INVALID_HANDLE_VALUE); + CLFS_INFORMATION info; + ULONG infoSize = sizeof(info); + BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize); + QPID_WINDOWS_CHECK_NOT(ok, 0); + ok = ::RegisterManageableLogClient(handle, 0); + QPID_WINDOWS_CHECK_NOT(ok, 0); + + // Set up policies for how many containers to initially create and how + // large each container should be. Also, auto-grow the log when container + // space runs out. + CLFS_MGMT_POLICY logPolicy; + logPolicy.Version = CLFS_MGMT_POLICY_VERSION; + logPolicy.LengthInBytes = sizeof(logPolicy); + logPolicy.PolicyFlags = 0; + + // If this is the first time this log is opened, give an opportunity to + // initialize its content. + bool needInitialize(false); + if (info.TotalContainers == 0) { + // New log; set the configured container size and create the + // initial set of containers. + logPolicy.PolicyType = ClfsMgmtPolicyNewContainerSize; + logPolicy.PolicyParameters.NewContainerSize.SizeInBytes = containerSize; + ok = ::InstallLogPolicy(handle, &logPolicy); + QPID_WINDOWS_CHECK_NOT(ok, 0); + + ULONGLONG desired(params.containers), actual(0); + ok = ::SetLogFileSizeWithPolicy(handle, &desired, &actual); + QPID_WINDOWS_CHECK_NOT(ok, 0); + + needInitialize = true; + } + // Ensure that the log is extended as needed and will shrink when 50% + // becomes unused. + logPolicy.PolicyType = ClfsMgmtPolicyAutoGrow; + logPolicy.PolicyParameters.AutoGrow.Enabled = 1; + ok = ::InstallLogPolicy(handle, &logPolicy); + QPID_WINDOWS_CHECK_NOT(ok, 0); + logPolicy.PolicyType = ClfsMgmtPolicyAutoShrink; + logPolicy.PolicyParameters.AutoShrink.Percentage = params.shrinkPct; + ok = ::InstallLogPolicy(handle, &logPolicy); + QPID_WINDOWS_CHECK_NOT(ok, 0); + + // Need a marshaling area + ok = ::CreateLogMarshallingArea(handle, + NULL, NULL, NULL, // Alloc, free, context + marshallingBufferSize(), + params.maxWriteBuffers, + 1, // Max read buffers + &marshal); + QPID_WINDOWS_CHECK_NOT(ok, 0); + if (needInitialize) + initialize(); +} + +uint32_t +Log::marshallingBufferSize() +{ + // Default implementation returns the minimum marshalling buffer size; + // derived ones should come up with a more fitting value. + // + // Find the directory name part of the log specification, including the + // trailing '\'. + size_t dirMarker = logPath.rfind('\\'); + if (dirMarker == std::string::npos) + dirMarker = logPath.rfind('/'); + DWORD bytesPerSector; + DWORD dontCare; + ::GetDiskFreeSpace(logPath.substr(0, dirMarker).c_str(), + &dontCare, + &bytesPerSector, + &dontCare, + &dontCare); + return bytesPerSector; +} + +CLFS_LSN +Log::write(void* entry, uint32_t length, CLFS_LSN* prev) +{ + CLFS_WRITE_ENTRY desc; + desc.Buffer = entry; + desc.ByteLength = length; + CLFS_LSN lsn; + BOOL ok = ::ReserveAndAppendLog(marshal, + &desc, 1, // Buffer descriptor + 0, prev, // Undo-Next, Prev + 0, 0, // Reservation + CLFS_FLAG_FORCE_FLUSH, + &lsn, + 0); + QPID_WINDOWS_CHECK_NOT(ok, 0); + return lsn; +} + +// Get the current base LSN of the log. +CLFS_LSN +Log::getBase() +{ + CLFS_INFORMATION info; + ULONG infoSize = sizeof(info); + BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize); + QPID_WINDOWS_CHECK_NOT(ok, 0); + return info.BaseLsn; +} + +void +Log::moveTail(const CLFS_LSN& oldest) +{ + BOOL ok = ::AdvanceLogBase(marshal, + const_cast<PCLFS_LSN>(&oldest), + 0, NULL); + // If multiple threads are manipulating things they may get out of + // order when moving the tail; if someone already moved it further + // than this, it's ok - ignore it. + if (ok || ::GetLastError() == ERROR_LOG_START_OF_LOG) + return; + QPID_WINDOWS_CHECK_NOT(ok, 0); +} + +}}} // namespace qpid::store::ms_clfs diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Log.h b/qpid/cpp/src/qpid/store/ms-clfs/Log.h new file mode 100644 index 0000000000..2f7eb6cada --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-clfs/Log.h @@ -0,0 +1,78 @@ +#ifndef QPID_STORE_MSCLFS_LOG_H +#define QPID_STORE_MSCLFS_LOG_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <windows.h> +#include <clfsw32.h> +#include <qpid/sys/IntegerTypes.h> + +namespace qpid { +namespace store { +namespace ms_clfs { + +/** + * @class Log + * + * Represents a CLFS-housed log. + */ +class Log { + +protected: + HANDLE handle; + ULONGLONG containerSize; + std::string logPath; + PVOID marshal; + + // Give subclasses a chance to initialize a new log. Called after a new + // log is created, initial set of containers is added, and marshalling + // area is allocated. + virtual void initialize() {} + +public: + struct TuningParameters { + size_t containerSize; + unsigned short containers; + unsigned short shrinkPct; + uint32_t maxWriteBuffers; + }; + + Log() : handle(INVALID_HANDLE_VALUE), containerSize(0), marshal(0) {} + virtual ~Log(); + + void open(const std::string& path, const TuningParameters& params); + + virtual uint32_t marshallingBufferSize(); + + CLFS_LSN write(void* entry, uint32_t length, CLFS_LSN* prev = 0); + + // Get the current base LSN of the log. + CLFS_LSN getBase(); + + // Move the log tail to the indicated LSN. + void moveTail(const CLFS_LSN& oldest); +}; + +}}} // namespace qpid::store::ms_clfs + +#endif /* QPID_STORE_MSCLFS_LOG_H */ diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h b/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h new file mode 100644 index 0000000000..7f46c1f266 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h @@ -0,0 +1,36 @@ +#ifndef QPID_STORE_MSCLFS_LSN_H +#define QPID_STORE_MSCLFS_LSN_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <clfsw32.h> + +namespace { + // Make it easy to assign LSNs + inline CLFS_LSN idToLsn(const uint64_t val) + { CLFS_LSN lsn; lsn.Internal = val; return lsn; } + + inline uint64_t lsnToId(const CLFS_LSN& lsn) + { uint64_t val = lsn.Internal; return val; } +} + +#endif /* QPID_STORE_MSCLFS_LSN_H */ diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp new file mode 100644 index 0000000000..5ff24e7d33 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp @@ -0,0 +1,1102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <list> +#include <map> +#include <set> +#include <stdlib.h> +#include <string> +#include <windows.h> +#include <clfsw32.h> +#include <qpid/broker/Broker.h> +#include <qpid/broker/RecoverableQueue.h> +#include <qpid/log/Statement.h> +#include <qpid/store/MessageStorePlugin.h> +#include <qpid/store/StoreException.h> +#include <qpid/store/StorageProvider.h> +#include <qpid/sys/Mutex.h> +#include <boost/foreach.hpp> + +// From ms-sql... +#include "BlobAdapter.h" +#include "BlobRecordset.h" +#include "BindingRecordset.h" +#include "DatabaseConnection.h" +#include "Exception.h" +#include "State.h" +#include "VariantHelper.h" +using qpid::store::ms_sql::BlobAdapter; +using qpid::store::ms_sql::BlobRecordset; +using qpid::store::ms_sql::BindingRecordset; +using qpid::store::ms_sql::DatabaseConnection; +using qpid::store::ms_sql::ADOException; +using qpid::store::ms_sql::State; +using qpid::store::ms_sql::VariantHelper; + +#include "Log.h" +#include "Messages.h" +#include "Transaction.h" +#include "TransactionLog.h" + +// Bring in ADO 2.8 (yes, I know it says "15", but that's it...) +#import "C:\Program Files\Common Files\System\ado\msado15.dll" \ + no_namespace rename("EOF", "EndOfFile") +#include <comdef.h> +namespace { +inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);}; + +// Table names +const std::string TblBinding("tblBinding"); +const std::string TblConfig("tblConfig"); +const std::string TblExchange("tblExchange"); +const std::string TblQueue("tblQueue"); + +} + +namespace qpid { +namespace store { +namespace ms_clfs { + +/** + * @class MSSqlClfsProvider + * + * Implements a qpid::store::StorageProvider that uses a hybrid Microsoft + * SQL Server and Windows CLFS approach as the backend data store for Qpid. + */ +class MSSqlClfsProvider : public qpid::store::StorageProvider +{ +protected: + void finalizeMe(); + + void dump(); + +public: + MSSqlClfsProvider(); + ~MSSqlClfsProvider(); + + virtual qpid::Options* getOptions() { return &options; } + + virtual void earlyInitialize (Plugin::Target& target); + virtual void initialize(Plugin::Target& target); + + /** + * Receive notification that this provider is the one that will actively + * handle provider storage for the target. If the provider is to be used, + * this method will be called after earlyInitialize() and before any + * recovery operations (recovery, in turn, precedes call to initialize()). + */ + virtual void activate(MessageStorePlugin &store); + + /** + * @name Methods inherited from qpid::broker::MessageStore + */ + + /** + * Record the existence of a durable queue + */ + virtual void create(PersistableQueue& queue, + const qpid::framing::FieldTable& args); + /** + * Destroy a durable queue + */ + virtual void destroy(PersistableQueue& queue); + + /** + * Record the existence of a durable exchange + */ + virtual void create(const PersistableExchange& exchange, + const qpid::framing::FieldTable& args); + /** + * Destroy a durable exchange + */ + virtual void destroy(const PersistableExchange& exchange); + + /** + * Record a binding + */ + virtual void bind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args); + + /** + * Forget a binding + */ + virtual void unbind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args); + + /** + * Record generic durable configuration + */ + virtual void create(const PersistableConfig& config); + + /** + * Destroy generic durable configuration + */ + virtual void destroy(const PersistableConfig& config); + + /** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ + virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg); + + /** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ + virtual void destroy(PersistableMessage& msg); + + /** + * Appends content to a previously staged message + */ + virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, + const std::string& data); + + /** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ + virtual void loadContent(const qpid::broker::PersistableQueue& queue, + const boost::intrusive_ptr<const PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length); + + /** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param msg the message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void enqueue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); + + /** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param msg the message to dequeue + * @param queue the name of the queue from which it is to be dequeued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); + + /** + * Flushes all async messages to disk for the specified queue + * + * Note: this is a no-op for this provider. + * + * @param queue the name of the queue from which it is to be dequeued + */ + virtual void flush(const PersistableQueue& queue) {}; + + /** + * Returns the number of outstanding AIO's for a given queue + * + * If 0, than all the enqueue / dequeues have been stored + * to disk + * + * @param queue the name of the queue to check for outstanding AIO + */ + virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) + {return 0;} + //@} + + /** + * @name Methods inherited from qpid::broker::TransactionalStore + */ + //@{ + virtual std::auto_ptr<qpid::broker::TransactionContext> begin(); + virtual std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid); + virtual void prepare(qpid::broker::TPCTransactionContext& txn); + virtual void commit(qpid::broker::TransactionContext& txn); + virtual void abort(qpid::broker::TransactionContext& txn); + virtual void collectPreparedXids(std::set<std::string>& xids); + //@} + + virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer); + virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer, + ExchangeMap& exchangeMap); + virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer, + QueueMap& queueMap); + virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer, + const ExchangeMap& exchangeMap, + const QueueMap& queueMap); + virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, + MessageMap& messageMap, + MessageQueueMap& messageQueueMap); + virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap); + +private: + struct ProviderOptions : public qpid::Options + { + std::string connectString; + std::string catalogName; + std::string storeDir; + size_t containerSize; + unsigned short initialContainers; + uint32_t maxWriteBuffers; + + ProviderOptions(const std::string &name) + : qpid::Options(name), + catalogName("QpidStore"), + containerSize(1024 * 1024), + initialContainers(2), + maxWriteBuffers(10) + { + const enum { NAMELEN = MAX_COMPUTERNAME_LENGTH + 1 }; + TCHAR myName[NAMELEN]; + DWORD myNameLen = NAMELEN; + GetComputerName(myName, &myNameLen); + connectString = "Data Source="; + connectString += myName; + connectString += "\\SQLEXPRESS;Integrated Security=SSPI"; + addOptions() + ("connect", + qpid::optValue(connectString, "STRING"), + "Connection string for the database to use. Will prepend " + "Provider=SQLOLEDB;") + ("catalog", + qpid::optValue(catalogName, "DB NAME"), + "Catalog (database) name") + ("store-dir", + qpid::optValue(storeDir, "DIR"), + "Location to store message and transaction data " + "(default uses data-dir if available)") + ("container-size", + qpid::optValue(containerSize, "VALUE"), + "Bytes per container; min 512K. Only used when creating " + "a new log") + ("initial-containers", + qpid::optValue(initialContainers, "VALUE"), + "Number of containers to add if creating a new log") + ("max-write-buffers", + qpid::optValue(maxWriteBuffers, "VALUE"), + "Maximum write buffers outstanding before log is flushed " + "(0 means no limit)") + ; + } + }; + ProviderOptions options; + std::string brokerDataDir; + Messages messages; + // TransactionLog requires itself to have a shared_ptr reference to start. + TransactionLog::shared_ptr transactions; + + // Each thread has a separate connection to the database and also needs + // to manage its COM initialize/finalize individually. This is done by + // keeping a thread-specific State. + boost::thread_specific_ptr<State> dbState; + + State *initState(); + DatabaseConnection *initConnection(void); + void createDb(DatabaseConnection *db, const std::string &name); + void createLogs(); +}; + +static MSSqlClfsProvider static_instance_registers_plugin; + +void +MSSqlClfsProvider::finalizeMe() +{ + dbState.reset(); +} + +MSSqlClfsProvider::MSSqlClfsProvider() + : options("MS SQL/CLFS Provider options") +{ + transactions.reset(new TransactionLog()); +} + +MSSqlClfsProvider::~MSSqlClfsProvider() +{ +} + +void +MSSqlClfsProvider::earlyInitialize(Plugin::Target &target) +{ + MessageStorePlugin *store = dynamic_cast<MessageStorePlugin *>(&target); + if (store) { + // Check the store dir option; if not specified, need to + // grab the broker's data dir. + if (options.storeDir.empty()) { + const DataDir& dir = store->getBroker()->getDataDir(); + if (dir.isEnabled()) { + options.storeDir = dir.getPath(); + } + else { + QPID_LOG(error, + "MSSQL-CLFS: --store-dir required if --no-data-dir specified"); + return; + } + } + + // If CLFS is not available on this system, give up now. + try { + Log::TuningParameters params; + params.containerSize = options.containerSize; + params.containers = options.initialContainers; + params.shrinkPct = 50; + params.maxWriteBuffers = options.maxWriteBuffers; + std::string msgPath = options.storeDir + "\\" + "messages"; + messages.openLog(msgPath, params); + std::string transPath = options.storeDir + "\\" + "transactions"; + transactions->open(transPath, params); + } + catch (std::exception &e) { + QPID_LOG(error, e.what()); + return; + } + + // If the database init fails, report it and don't register; give + // the rest of the broker a chance to run. + // + // Don't try to initConnection() since that will fail if the + // database doesn't exist. Instead, try to open a connection without + // a database name, then search for the database. There's still a + // chance this provider won't be selected for the store too, so be + // be sure to close the database connection before return to avoid + // leaving a connection up that will not be used. + try { + initState(); // This initializes COM + std::auto_ptr<DatabaseConnection> db(new DatabaseConnection()); + db->open(options.connectString, ""); + _ConnectionPtr conn(*db); + _RecordsetPtr pCatalogs = NULL; + VariantHelper<std::string> catalogName(options.catalogName); + pCatalogs = conn->OpenSchema(adSchemaCatalogs, catalogName); + if (pCatalogs->EndOfFile) { + // Database doesn't exist; create it + QPID_LOG(notice, + "MSSQL-CLFS: Creating database " + options.catalogName); + createDb(db.get(), options.catalogName); + } + else { + QPID_LOG(notice, + "MSSQL-CLFS: Database located: " + options.catalogName); + } + if (pCatalogs) { + if (pCatalogs->State == adStateOpen) + pCatalogs->Close(); + pCatalogs = 0; + } + db->close(); + store->providerAvailable("MSSQL-CLFS", this); + } + catch (qpid::Exception &e) { + QPID_LOG(error, e.what()); + return; + } + store->addFinalizer(boost::bind(&MSSqlClfsProvider::finalizeMe, this)); + } +} + +void +MSSqlClfsProvider::initialize(Plugin::Target& target) +{ +} + +void +MSSqlClfsProvider::activate(MessageStorePlugin &store) +{ + QPID_LOG(info, "MS SQL/CLFS Provider is up"); +} + +void +MSSqlClfsProvider::create(PersistableQueue& queue, + const qpid::framing::FieldTable& /*args needed for jrnl*/) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; + try { + db->beginTransaction(); + rsQueues.open(db, TblQueue); + rsQueues.add(queue); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error creating queue " + queue.getName(), e, errs); + } + catch(std::exception& e) { + db->rollbackTransaction(); + THROW_STORE_EXCEPTION(e.what()); + } +} + +/** + * Destroy a durable queue + */ +void +MSSqlClfsProvider::destroy(PersistableQueue& queue) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; + BindingRecordset rsBindings; + try { + db->beginTransaction(); + rsQueues.open(db, TblQueue); + rsBindings.open(db, TblBinding); + // Remove bindings first; the queue IDs can't be ripped out from + // under the references in the bindings table. + rsBindings.removeForQueue(queue.getPersistenceId()); + rsQueues.remove(queue); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error deleting queue " + queue.getName(), e, errs); + } + + /* + * Now that the SQL stuff has recorded the queue deletion, expunge + * all record of the queue from the messages set. Any errors logging + * these removals are swallowed because during a recovery the queue + * Id won't be present (the SQL stuff already committed) so any references + * to it in message operations will be removed. + */ + messages.expunge(queue.getPersistenceId()); +} + +/** + * Record the existence of a durable exchange + */ +void +MSSqlClfsProvider::create(const PersistableExchange& exchange, + const qpid::framing::FieldTable& args) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsExchanges; + try { + db->beginTransaction(); + rsExchanges.open(db, TblExchange); + rsExchanges.add(exchange); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error creating exchange " + exchange.getName(), + e, + errs); + } +} + +/** + * Destroy a durable exchange + */ +void +MSSqlClfsProvider::destroy(const PersistableExchange& exchange) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsExchanges; + BindingRecordset rsBindings; + try { + db->beginTransaction(); + rsExchanges.open(db, TblExchange); + rsBindings.open(db, TblBinding); + // Remove bindings first; the exchange IDs can't be ripped out from + // under the references in the bindings table. + rsBindings.removeForExchange(exchange.getPersistenceId()); + rsExchanges.remove(exchange); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error deleting exchange " + exchange.getName(), + e, + errs); + } +} + +/** + * Record a binding + */ +void +MSSqlClfsProvider::bind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args) +{ + DatabaseConnection *db = initConnection(); + BindingRecordset rsBindings; + try { + db->beginTransaction(); + rsBindings.open(db, TblBinding); + rsBindings.add(exchange.getPersistenceId(), + queue.getPersistenceId(), + key, + args); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error binding exchange " + exchange.getName() + + " to queue " + queue.getName(), + e, + errs); + } +} + +/** + * Forget a binding + */ +void +MSSqlClfsProvider::unbind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args) +{ + DatabaseConnection *db = initConnection(); + BindingRecordset rsBindings; + try { + db->beginTransaction(); + rsBindings.open(db, TblBinding); + rsBindings.remove(exchange.getPersistenceId(), + queue.getPersistenceId(), + key, + args); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error unbinding exchange " + exchange.getName() + + " from queue " + queue.getName(), + e, + errs); + } +} + +/** + * Record generic durable configuration + */ +void +MSSqlClfsProvider::create(const PersistableConfig& config) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsConfigs; + try { + db->beginTransaction(); + rsConfigs.open(db, TblConfig); + rsConfigs.add(config); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error creating config " + config.getName(), e, errs); + } +} + +/** + * Destroy generic durable configuration + */ +void +MSSqlClfsProvider::destroy(const PersistableConfig& config) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsConfigs; + try { + db->beginTransaction(); + rsConfigs.open(db, TblConfig); + rsConfigs.remove(config); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error deleting config " + config.getName(), e, errs); + } +} + +/** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ +void +MSSqlClfsProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg) +{ +#if 0 + DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; + try { + db->beginTransaction(); + rsMessages.open(db, TblMessage); + rsMessages.add(msg); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error staging message", e, errs); + } +#endif +} + +/** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ +void +MSSqlClfsProvider::destroy(PersistableMessage& msg) +{ +#if 0 + DatabaseConnection *db = initConnection(); + BlobRecordset rsMessages; + try { + db->beginTransaction(); + rsMessages.open(db, TblMessage); + rsMessages.remove(msg); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error deleting message", e, errs); + } +#endif +} + +/** + * Appends content to a previously staged message + */ +void +MSSqlClfsProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, + const std::string& data) +{ +#if 0 + DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; + try { + db->beginTransaction(); + rsMessages.open(db, TblMessage); + rsMessages.append(msg, data); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error appending to message", e, errs); + } +#endif +} + +/** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ +void +MSSqlClfsProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/, + const boost::intrusive_ptr<const PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length) +{ + // Message log keeps all messages in one log, so we don't need the + // queue reference. + messages.loadContent(msg->getPersistenceId(), data, offset, length); +} + +/** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * @param ctxt The transaction context under which this enqueue happens. + * @param msg The message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + */ +void +MSSqlClfsProvider::enqueue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) +{ + Transaction::shared_ptr t; + TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt); + if (ctx) + t = ctx->getTransaction(); + else { + TPCTransactionContext *tctx; + tctx = dynamic_cast<TPCTransactionContext*>(ctxt); + if (tctx) + t = tctx->getTransaction(); + } + uint64_t msgId = msg->getPersistenceId(); + if (msgId == 0) { + messages.add(msg); + msgId = msg->getPersistenceId(); + } + messages.enqueue(msgId, queue.getPersistenceId(), t); + msg->enqueueComplete(); +} + +/** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * @param ctxt The transaction context under which this dequeue happens. + * @param msg The message to dequeue + * @param queue The queue from which it is to be dequeued + */ +void +MSSqlClfsProvider::dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) +{ + Transaction::shared_ptr t; + TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt); + if (ctx) + t = ctx->getTransaction(); + else { + TPCTransactionContext *tctx; + tctx = dynamic_cast<TPCTransactionContext*>(ctxt); + if (tctx) + t = tctx->getTransaction(); + } + messages.dequeue(msg->getPersistenceId(), queue.getPersistenceId(), t); + msg->dequeueComplete(); +} + +std::auto_ptr<qpid::broker::TransactionContext> +MSSqlClfsProvider::begin() +{ + Transaction::shared_ptr t = transactions->begin(); + std::auto_ptr<qpid::broker::TransactionContext> tc(new TransactionContext(t)); + return tc; +} + +std::auto_ptr<qpid::broker::TPCTransactionContext> +MSSqlClfsProvider::begin(const std::string& xid) +{ + TPCTransaction::shared_ptr t = transactions->begin(xid); + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(new TPCTransactionContext(t)); + return tc; +} + +void +MSSqlClfsProvider::prepare(qpid::broker::TPCTransactionContext& txn) +{ + TPCTransactionContext *ctx = dynamic_cast<TPCTransactionContext*> (&txn); + if (ctx == 0) + throw qpid::broker::InvalidTransactionContextException(); + ctx->getTransaction()->prepare(); +} + +void +MSSqlClfsProvider::commit(qpid::broker::TransactionContext& txn) +{ + Transaction::shared_ptr t; + TransactionContext *ctx = dynamic_cast<TransactionContext*>(&txn); + if (ctx) + t = ctx->getTransaction(); + else { + TPCTransactionContext *tctx; + tctx = dynamic_cast<TPCTransactionContext*>(&txn); + if (tctx == 0) + throw qpid::broker::InvalidTransactionContextException(); + t = tctx->getTransaction(); + } + t->commit(messages); +} + +void +MSSqlClfsProvider::abort(qpid::broker::TransactionContext& txn) +{ + Transaction::shared_ptr t; + TransactionContext *ctx = dynamic_cast<TransactionContext*>(&txn); + if (ctx) + t = ctx->getTransaction(); + else { + TPCTransactionContext *tctx; + tctx = dynamic_cast<TPCTransactionContext*>(&txn); + if (tctx == 0) + throw qpid::broker::InvalidTransactionContextException(); + t = tctx->getTransaction(); + } + t->abort(messages); +} + +void +MSSqlClfsProvider::collectPreparedXids(std::set<std::string>& xids) +{ + std::map<std::string, TPCTransaction::shared_ptr> preparedMap; + transactions->collectPreparedXids(preparedMap); + std::map<std::string, TPCTransaction::shared_ptr>::const_iterator i; + for (i = preparedMap.begin(); i != preparedMap.end(); ++i) { + xids.insert(i->first); + } +} + +// @TODO Much of this recovery code is way too similar... refactor to +// a recover template method on BlobRecordset. + +void +MSSqlClfsProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsConfigs; + rsConfigs.open(db, TblConfig); + _RecordsetPtr p = (_RecordsetPtr)rsConfigs; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Config instance and reset its ID. + broker::RecoverableConfig::shared_ptr config = + recoverer.recoverConfig(blob); + config->setPersistenceId(id); + p->MoveNext(); + } +} + +void +MSSqlClfsProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer, + ExchangeMap& exchangeMap) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsExchanges; + rsExchanges.open(db, TblExchange); + _RecordsetPtr p = (_RecordsetPtr)rsExchanges; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Exchange instance, reset its ID, and remember the + // ones restored for matching up when recovering bindings. + broker::RecoverableExchange::shared_ptr exchange = + recoverer.recoverExchange(blob); + exchange->setPersistenceId(id); + exchangeMap[id] = exchange; + p->MoveNext(); + } +} + +void +MSSqlClfsProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer, + QueueMap& queueMap) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; + rsQueues.open(db, TblQueue); + _RecordsetPtr p = (_RecordsetPtr)rsQueues; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Queue instance and reset its ID. + broker::RecoverableQueue::shared_ptr queue = + recoverer.recoverQueue(blob); + queue->setPersistenceId(id); + queueMap[id] = queue; + p->MoveNext(); + } +} + +void +MSSqlClfsProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer, + const ExchangeMap& exchangeMap, + const QueueMap& queueMap) +{ + DatabaseConnection *db = initConnection(); + BindingRecordset rsBindings; + rsBindings.open(db, TblBinding); + rsBindings.recover(recoverer, exchangeMap, queueMap); +} + +void +MSSqlClfsProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer, + MessageMap& messageMap, + MessageQueueMap& messageQueueMap) +{ + // Read the list of valid queue Ids to ensure that no broken msg->queue + // refs get restored. + DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; + rsQueues.open(db, TblQueue); + _RecordsetPtr p = (_RecordsetPtr)rsQueues; + std::set<uint64_t> validQueues; + if (!(p->BOF && p->EndOfFile)) { + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + validQueues.insert(id); + p->MoveNext(); + } + } + std::map<uint64_t, Transaction::shared_ptr> transMap; + transactions->recover(transMap); + messages.recover(recoverer, + validQueues, + transMap, + messageMap, + messageQueueMap); +} + +void +MSSqlClfsProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap) +{ + std::map<std::string, TPCTransaction::shared_ptr> preparedMap; + transactions->collectPreparedXids(preparedMap); + std::map<std::string, TPCTransaction::shared_ptr>::const_iterator i; + for (i = preparedMap.begin(); i != preparedMap.end(); ++i) { + std::auto_ptr<TPCTransactionContext> ctx(new TPCTransactionContext(i->second)); + std::auto_ptr<qpid::broker::TPCTransactionContext> brokerCtx(ctx); + dtxMap[i->first] = recoverer.recoverTransaction(i->first, brokerCtx); + } +} + +////////////// Internal Methods + +State * +MSSqlClfsProvider::initState() +{ + State *state = dbState.get(); // See if thread has initialized + if (!state) { + state = new State; + dbState.reset(state); + } + return state; +} + +DatabaseConnection * +MSSqlClfsProvider::initConnection(void) +{ + State *state = initState(); + if (state->dbConn != 0) + return state->dbConn; // And the DatabaseConnection is set up too + std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + db->open(options.connectString, options.catalogName); + state->dbConn = db.release(); + return state->dbConn; +} + +void +MSSqlClfsProvider::createDb(DatabaseConnection *db, const std::string &name) +{ + const std::string dbCmd = "CREATE DATABASE " + name; + const std::string useCmd = "USE " + name; + const std::string tableCmd = "CREATE TABLE "; + const std::string colSpecs = + " (persistenceId bigint PRIMARY KEY NOT NULL IDENTITY(1,1)," + " fieldTableBlob varbinary(MAX) NOT NULL)"; + const std::string bindingSpecs = + " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL," + " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL," + " routingKey varchar(255)," + " fieldTableBlob varbinary(MAX))"; + + _variant_t unused; + _bstr_t dbStr = dbCmd.c_str(); + _ConnectionPtr conn(*db); + try { + conn->Execute(dbStr, &unused, adExecuteNoRecords); + _bstr_t useStr = useCmd.c_str(); + conn->Execute(useStr, &unused, adExecuteNoRecords); + std::string makeTable = tableCmd + TblQueue + colSpecs; + _bstr_t makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblExchange + colSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblConfig + colSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblBinding + bindingSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + } + catch(_com_error &e) { + throw ADOException("MSSQL can't create " + name, e, db->getErrors()); + } +} + +void +MSSqlClfsProvider::dump() +{ + // dump all db records to qpid_log + QPID_LOG(notice, "DB Dump: (not dumping anything)"); + // rsQueues.dump(); +} + + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp new file mode 100644 index 0000000000..849a0a44e8 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp @@ -0,0 +1,406 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <windows.h> +#include <clfsw32.h> +#include <exception> +#include <malloc.h> +#include <memory.h> +#include <qpid/framing/Buffer.h> +#include <qpid/log/Statement.h> +#include <qpid/sys/IntegerTypes.h> +#include <qpid/sys/windows/check.h> + +#include "MessageLog.h" +#include "Lsn.h" + +namespace qpid { +namespace store { +namespace ms_clfs { + +namespace { + +// Structures that hold log records. Each has a type field at the start. +enum MessageEntryType { + MessageStartEntry = 1, + MessageChunkEntry = 2, + MessageDeleteEntry = 3, + MessageEnqueueEntry = 4, + MessageDequeueEntry = 5 +}; +static const uint32_t MaxMessageContentLength = 64 * 1024; + +// Message-Start +struct MessageStart { + MessageEntryType type; + // If the complete message encoding doesn't fit, remainder is in + // MessageChunk records to follow. + // headerLength is the size of the message's header in content. It is + // part of the totalLength and the segmentLength. + uint32_t headerLength; + uint32_t totalLength; + uint32_t segmentLength; + char content[MaxMessageContentLength]; + + MessageStart() + : type(MessageStartEntry), + headerLength(0), + totalLength(0), + segmentLength(0) {} +}; +// Message-Chunk +struct MessageChunk { + MessageEntryType type; + uint32_t segmentLength; + char content[MaxMessageContentLength]; + + MessageChunk() : type(MessageChunkEntry), segmentLength(0) {} +}; +// Message-Delete +struct MessageDelete { + MessageEntryType type; + + MessageDelete() : type(MessageDeleteEntry) {} +}; +// Message-Enqueue +struct MessageEnqueue { + MessageEntryType type; + uint64_t queueId; + uint64_t transId; + + MessageEnqueue(uint64_t qId = 0, uint64_t tId = 0) + : type(MessageEnqueueEntry), queueId(qId), transId(tId) {} +}; +// Message-Dequeue +struct MessageDequeue { + MessageEntryType type; + uint64_t queueId; + uint64_t transId; + + MessageDequeue(uint64_t qId = 0, uint64_t tId = 0) + : type(MessageDequeueEntry), queueId(qId), transId(tId) {} +}; + +} // namespace + +void +MessageLog::initialize() +{ + // Write something to occupy the first record, preventing a real message + // from being lsn/id 0. Delete of a non-existant id is easily tossed + // during recovery if no other messages have caused the tail to be moved + // up past this dummy record by then. + deleteMessage(0, 0); +} + +uint32_t +MessageLog::marshallingBufferSize() +{ + size_t biggestNeed = std::max(sizeof(MessageStart), sizeof(MessageEnqueue)); + uint32_t defSize = static_cast<uint32_t>(biggestNeed); + uint32_t minSize = Log::marshallingBufferSize(); + if (defSize <= minSize) + return minSize; + // Round up to multiple of minSize + return (defSize + minSize) / minSize * minSize; +} + +uint64_t +MessageLog::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg) +{ + // The message may be too long to fit in one record; if so, write + // Message-Chunk records to contain the rest. If it does all fit in one + // record, though, optimize the encoding by going straight to the + // Message-Start record rather than encoding then copying to the record. + // In all case + MessageStart entry; + uint32_t encodedMessageLength = msg->encodedSize(); + entry.headerLength = msg->encodedHeaderSize(); + entry.totalLength = encodedMessageLength; + CLFS_LSN location, lastChunkLsn; + std::auto_ptr<char> encodeStage; + char *encodeBuff = 0; + bool oneRecord = encodedMessageLength <= MaxMessageContentLength; + if (oneRecord) { + encodeBuff = entry.content; + entry.segmentLength = encodedMessageLength; + } + else { + encodeStage.reset(new char[encodedMessageLength]); + encodeBuff = encodeStage.get(); + entry.segmentLength = MaxMessageContentLength; + } + qpid::framing::Buffer buff(encodeBuff, encodedMessageLength); + msg->encode(buff); + if (!oneRecord) + memcpy_s(entry.content, sizeof(entry.content), + encodeBuff, entry.segmentLength); + uint32_t entryLength = static_cast<uint32_t>(sizeof(entry)); + entryLength -= (MaxMessageContentLength - entry.segmentLength); + location = write(&entry, entryLength); + // Write any Message-Chunk records before setting the message's id. + uint32_t sent = entry.segmentLength; + uint32_t remaining = encodedMessageLength - entry.segmentLength; + while (remaining > 0) { + MessageChunk chunk; + chunk.segmentLength = std::max(MaxMessageContentLength, remaining); + memcpy_s(chunk.content, sizeof(chunk.content), + encodeStage.get() + sent, chunk.segmentLength); + entryLength = static_cast<uint32_t>(sizeof(chunk)); + entryLength -= (MaxMessageContentLength - chunk.segmentLength); + lastChunkLsn = write(&chunk, entryLength, &location); + sent += chunk.segmentLength; + remaining -= chunk.segmentLength; + } + return lsnToId(location); +} + +void +MessageLog::deleteMessage(uint64_t messageId, uint64_t newFirstId) +{ + MessageDelete deleteEntry; + CLFS_LSN msgLsn = idToLsn(messageId); + write(&deleteEntry, sizeof(deleteEntry), &msgLsn); + if (newFirstId != 0) + moveTail(idToLsn(newFirstId)); +} + +// Load part or all of a message's content from previously stored +// log record(s). +void +MessageLog::loadContent(uint64_t messageId, + std::string& data, + uint64_t offset, + uint32_t length) +{ +} + +void +MessageLog::recordEnqueue (uint64_t messageId, + uint64_t queueId, + uint64_t transactionId) +{ + MessageEnqueue entry(queueId, transactionId); + CLFS_LSN msgLsn = idToLsn(messageId); + write(&entry, sizeof(entry), &msgLsn); +} + +void +MessageLog::recordDequeue (uint64_t messageId, + uint64_t queueId, + uint64_t transactionId) +{ + MessageDequeue entry(queueId, transactionId); + CLFS_LSN msgLsn = idToLsn(messageId); + write(&entry, sizeof(entry), &msgLsn); +} + +void +MessageLog::recover(qpid::broker::RecoveryManager& recoverer, + qpid::store::MessageMap& messageMap, + std::map<uint64_t, std::vector<RecoveredMsgOp> >& messageOps) +{ + // If context and content needs to be saved while reassembling messages + // split across log records, save the info and reassembly buffer. + struct MessageBlocks { + uint32_t totalLength; + uint32_t soFarLength; + boost::shared_ptr<char> content; + + MessageBlocks() : totalLength(0), soFarLength(0), content((char*)0) {} + }; + std::map<uint64_t, MessageBlocks> reassemblies; + std::map<uint64_t, MessageBlocks>::iterator at; + + QPID_LOG(debug, "Recovering message log"); + + // Note that there may be message refs in the log which are deleted, so + // be sure to only add msgs at message-start record, and ignore those + // that don't have an existing message record. + // Get the base LSN - that's how to say "start reading at the beginning" + CLFS_INFORMATION info; + ULONG infoLength = sizeof (info); + BOOL ok = ::GetLogFileInformation(handle, &info, &infoLength); + QPID_WINDOWS_CHECK_NOT(ok, 0); + + // Pointers for the various record types that can be assigned in the + // reading loop below. + MessageStart *start; + MessageChunk *chunk; + MessageEnqueue *enqueue; + MessageDequeue *dequeue; + + qpid::store::MessageMap::iterator messageMapSpot; + qpid::store::MessageQueueMap::iterator queueMapSpot; + PVOID recordPointer; + ULONG recordLength; + CLFS_RECORD_TYPE recordType = ClfsDataRecord; + CLFS_LSN messageLsn, current, undoNext; + PVOID readContext; + uint64_t msgId; + // Note 'current' in case it's needed below; ReadNextLogRecord returns it + // via a parameter. + current = info.BaseLsn; + ok = ::ReadLogRecord(marshal, + &info.BaseLsn, + ClfsContextForward, + &recordPointer, + &recordLength, + &recordType, + &undoNext, + &messageLsn, + &readContext, + 0); + while (ok) { + // All the record types this class writes have a MessageEntryType in the + // beginning. Based on that, do what's needed. + MessageEntryType *t = + reinterpret_cast<MessageEntryType *>(recordPointer); + switch(*t) { + case MessageStartEntry: + start = reinterpret_cast<MessageStart *>(recordPointer); + msgId = lsnToId(current); + QPID_LOG(debug, "Message Start, id " << msgId); + // If the message content is split across multiple log records, save + // this content off to the side until the remaining record(s) are + // located. + if (start->totalLength == start->segmentLength) { // Whole thing + // Start by recovering the header then see if the rest of + // the content is desired. + qpid::framing::Buffer buff(start->content, start->headerLength); + qpid::broker::RecoverableMessage::shared_ptr m = + recoverer.recoverMessage(buff); + m->setPersistenceId(msgId); + messageMap[msgId] = m; + uint32_t contentLength = + start->totalLength - start->headerLength; + if (m->loadContent(contentLength)) { + qpid::framing::Buffer content(&(start->content[start->headerLength]), + contentLength); + m->decodeContent(content); + } + } + else { + // Save it in a block big enough. + MessageBlocks b; + b.totalLength = start->totalLength; + b.soFarLength = start->segmentLength; + b.content.reset(new char[b.totalLength]); + memcpy_s(b.content.get(), b.totalLength, + start->content, start->segmentLength); + reassemblies[msgId] = b; + } + break; + case MessageChunkEntry: + chunk = reinterpret_cast<MessageChunk *>(recordPointer); + // Remember, all entries chained to MessageStart via previous. + msgId = lsnToId(messageLsn); + QPID_LOG(debug, "Message Chunk for id " << msgId); + at = reassemblies.find(msgId); + if (at == reassemblies.end()) { + QPID_LOG(debug, "Message frag for " << msgId << + " but no start; discarded"); + } + else { + MessageBlocks *b = &(at->second); + if (b->soFarLength + chunk->segmentLength > b->totalLength) + throw std::runtime_error("Invalid message chunk length"); + memcpy_s(b->content.get() + b->soFarLength, + b->totalLength - b->soFarLength, + chunk->content, + chunk->segmentLength); + b->soFarLength += chunk->segmentLength; + if (b->totalLength == b->soFarLength) { + qpid::framing::Buffer buff(b->content.get(), + b->totalLength); + qpid::broker::RecoverableMessage::shared_ptr m = + recoverer.recoverMessage(buff); + m->setPersistenceId(msgId); + messageMap[msgId] = m; + reassemblies.erase(at); + } + } + break; + case MessageDeleteEntry: + msgId = lsnToId(messageLsn); + QPID_LOG(debug, "Message Delete, id " << msgId); + messageMap.erase(msgId); + messageOps.erase(msgId); + break; + case MessageEnqueueEntry: + enqueue = reinterpret_cast<MessageEnqueue *>(recordPointer); + msgId = lsnToId(messageLsn); + QPID_LOG(debug, "Message " << msgId << " Enqueue on queue " << + enqueue->queueId << ", txn " << enqueue->transId); + if (messageMap.find(msgId) == messageMap.end()) { + QPID_LOG(debug, + "Message " << msgId << " doesn't exist; discarded"); + } + else { + std::vector<RecoveredMsgOp>& ops = messageOps[msgId]; + RecoveredMsgOp op(RECOVERED_ENQUEUE, + enqueue->queueId, + enqueue->transId); + ops.push_back(op); + } + break; + case MessageDequeueEntry: + dequeue = reinterpret_cast<MessageDequeue *>(recordPointer); + msgId = lsnToId(messageLsn); + QPID_LOG(debug, "Message " << msgId << " Dequeue from queue " << + dequeue->queueId); + if (messageMap.find(msgId) == messageMap.end()) { + QPID_LOG(debug, + "Message " << msgId << " doesn't exist; discarded"); + } + else { + std::vector<RecoveredMsgOp>& ops = messageOps[msgId]; + RecoveredMsgOp op(RECOVERED_DEQUEUE, + dequeue->queueId, + dequeue->transId); + ops.push_back(op); + } + break; + default: + throw std::runtime_error("Bad message log entry type"); + } + + recordType = ClfsDataRecord; + ok = ::ReadNextLogRecord(readContext, + &recordPointer, + &recordLength, + &recordType, + 0, // No userLsn + &undoNext, + &messageLsn, + ¤t, + 0); + } + DWORD status = ::GetLastError(); + ::TerminateReadLog(readContext); + if (status == ERROR_HANDLE_EOF) { // No more records + QPID_LOG(debug, "Message log recovered"); + return; + } + throw QPID_WINDOWS_ERROR(status); +} + +}}} // namespace qpid::store::ms_clfs diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h new file mode 100644 index 0000000000..b3705287a6 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h @@ -0,0 +1,107 @@ +#ifndef QPID_STORE_MSCLFS_MESSAGELOG_H +#define QPID_STORE_MSCLFS_MESSAGELOG_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/intrusive_ptr.hpp> +#include <qpid/broker/PersistableMessage.h> +#include <qpid/broker/RecoveryManager.h> +#include <qpid/sys/IntegerTypes.h> +#include <qpid/store/StorageProvider.h> + +#include "Log.h" + +namespace qpid { +namespace store { +namespace ms_clfs { + +/** + * @class MessageLog + * + * Represents a CLFS-housed message log. + */ +class MessageLog : public Log { + +protected: + // Message log needs to have a no-op first record written in the log + // to ensure that no real message gets an ID 0. + virtual void initialize(); + +public: + // Inherited and reimplemented from Log. Figure the minimum marshalling + // buffer size needed for the records this class writes. + virtual uint32_t marshallingBufferSize(); + + // Add the specified message to the log; Return the persistence Id. + uint64_t add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg); + + // Write a Delete entry for messageId. If newFirstId is not 0, it is now + // the earliest valid message in the log, so move the tail up to it. + void deleteMessage(uint64_t messageId, uint64_t newFirstId); + + // Load part or all of a message's content from previously stored + // log record(s). + void loadContent(uint64_t messageId, + std::string& data, + uint64_t offset, + uint32_t length); + + // Enqueue and dequeue operations track messages' transit across + // queues; each operation may be associated with a transaction. If + // the transactionId is 0 the operation is not associated with a + // transaction. + void recordEnqueue (uint64_t messageId, + uint64_t queueId, + uint64_t transactionId); + void recordDequeue (uint64_t messageId, + uint64_t queueId, + uint64_t transactionId); + + // Recover the messages and their queueing records from the log. + // @param recoverer Recovery manager used to recreate broker objects from + // encoded framing buffers recovered from the log. + // @param messageMap This method fills in the map of id -> ptr of + // recovered messages. + // @param messageOps This method fills in the map of msg id -> + // vector of operations involving the message that were + // recovered from the log. It is the caller's + // responsibility to sort the operations out and + // ascertain which operations should be acted on. The + // order of operations in the vector is as they were + // read in order from the log. + typedef enum { RECOVERED_ENQUEUE = 1, RECOVERED_DEQUEUE } RecoveredOpType; + struct RecoveredMsgOp { + RecoveredOpType op; + uint64_t queueId; + uint64_t txnId; + + RecoveredMsgOp(RecoveredOpType o, const uint64_t& q, const uint64_t& t) + : op(o), queueId(q), txnId(t) {} + }; + void recover(qpid::broker::RecoveryManager& recoverer, + qpid::store::MessageMap& messageMap, + std::map<uint64_t, std::vector<RecoveredMsgOp> >& messageOps); +}; + +}}} // namespace qpid::store::ms_clfs + +#endif /* QPID_STORE_MSCLFS_MESSAGELOG_H */ diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp new file mode 100644 index 0000000000..db5d2ebf4c --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp @@ -0,0 +1,472 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/log/Statement.h> + +#include "Messages.h" +#include "Lsn.h" +#include "qpid/store/StoreException.h" +#include <boost/foreach.hpp> + +namespace qpid { +namespace store { +namespace ms_clfs { + +void +Messages::openLog(const std::string& path, const Log::TuningParameters& params) +{ + log.open (path, params); +} + +void +Messages::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg) +{ + uint64_t id = log.add(msg); + msg->setPersistenceId(id); + std::auto_ptr<MessageInfo> autom(new MessageInfo); + MessageInfo::shared_ptr m(autom); + std::pair<uint64_t, MessageInfo::shared_ptr> p(id, m); + { + qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock); + messages.insert(p); + // If there's only this one message there, move the tail to it. + // This prevents the log from continually growing when messages + // are added and removed one at a time. + if (messages.size() == 1) { + CLFS_LSN newTail = idToLsn(id); + log.moveTail(newTail); + } + } +} + +void +Messages::enqueue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t) +{ + MessageInfo::shared_ptr p; + { + qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock); + MessageMap::const_iterator i = messages.find(msgId); + if (i == messages.end()) + THROW_STORE_EXCEPTION("Message does not exist"); + p = i->second; + } + MessageInfo::Location loc(queueId, t, MessageInfo::TRANSACTION_ENQUEUE); + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock); + p->where.push_back(loc); + uint64_t transactionId = 0; + if (t.get() != 0) { + transactionId = t->getId(); + t->enroll(msgId); + } + try { + log.recordEnqueue(msgId, queueId, transactionId); + } + catch (...) { + // Undo the record-keeping if the log wasn't written correctly. + if (transactionId != 0) + t->unenroll(msgId); + p->where.pop_back(); + throw; + } + } +} + +void +Messages::dequeue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t) +{ + MessageInfo::shared_ptr p; + { + qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock); + MessageMap::const_iterator i = messages.find(msgId); + if (i == messages.end()) + THROW_STORE_EXCEPTION("Message does not exist"); + p = i->second; + } + { + // Locate the 'where' entry for the specified queue. Once this operation + // is recorded in the log, update the 'where' entry to reflect it. + // Note that an existing entry in 'where' that refers to a transaction + // is not eligible for this operation. + qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock); + std::list<MessageInfo::Location>::iterator i; + for (i = p->where.begin(); i != p->where.end(); ++i) { + if (i->queueId == queueId && i->transaction.get() == 0) + break; + } + if (i == p->where.end()) + THROW_STORE_EXCEPTION("Message not on queue"); + uint64_t transactionId = 0; + if (t.get() != 0) { + transactionId = t->getId(); + t->enroll(msgId); + } + try { + log.recordDequeue(msgId, queueId, transactionId); + } + catch (...) { + // Undo the record-keeping if the log wasn't written correctly. + if (transactionId != 0) + t->unenroll(msgId); + throw; + } + // Ok, logged successfully. If this is a transactional op, note + // the transaction. If non-transactional, remove the 'where' entry. + if (transactionId != 0) { + i->transaction = t; + i->disposition = MessageInfo::TRANSACTION_DEQUEUE; + } + else { + p->where.erase(i); + // If the message doesn't exist on any other queues, remove it. + if (p->where.empty()) + remove(msgId); + } + } +} + +// Commit a previous provisional enqueue or dequeue of a particular message +// actions under a specified transaction. If this results in the message's +// being removed from all queues, it is deleted. +void +Messages::commit(uint64_t msgId, Transaction::shared_ptr& t) +{ + MessageInfo::shared_ptr p; + { + qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock); + MessageMap::const_iterator i = messages.find(msgId); + if (i == messages.end()) + THROW_STORE_EXCEPTION("Message does not exist"); + p = i->second; + } + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock); + std::list<MessageInfo::Location>::iterator i; + for (i = p->where.begin(); i != p->where.end(); ++i) { + if (i->transaction != t) + continue; + // Transactional dequeues can now remove the item from the + // where list; enqueues just clear the transaction reference. + if (i->disposition == MessageInfo::TRANSACTION_DEQUEUE) + i = p->where.erase(i); + else + i->transaction.reset(); + } + } + // If committing results in this message having no further enqueue + // references, delete it. If the delete fails, swallow the exception + // and let recovery take care of removing it later. + if (p->where.empty()) { + try { + remove(msgId); + } + catch(...) {} + } +} + +// Abort a previous provisional enqueue or dequeue of a particular message +// actions under a specified transaction. If this results in the message's +// being removed from all queues, it is deleted. +void +Messages::abort(uint64_t msgId, Transaction::shared_ptr& t) +{ + MessageInfo::shared_ptr p; + { + qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock); + MessageMap::const_iterator i = messages.find(msgId); + if (i == messages.end()) + THROW_STORE_EXCEPTION("Message does not exist"); + p = i->second; + } + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock); + std::list<MessageInfo::Location>::iterator i = p->where.begin(); + while (i != p->where.end()) { + if (i->transaction != t) { + ++i; + continue; + } + // Aborted transactional dequeues result in the message remaining + // enqueued like before the operation; enqueues clear the + // message from the where list - like the enqueue never happened. + if (i->disposition == MessageInfo::TRANSACTION_ENQUEUE) + i = p->where.erase(i); + else { + i->transaction.reset(); + ++i; + } + } + } + // If aborting results in this message having no further enqueue + // references, delete it. If the delete fails, swallow the exception + // and let recovery take care of removing it later. + if (p->where.empty()) { + try { + remove(msgId); + } + catch(...) {} + } +} + +// Load part or all of a message's content from previously stored +// log record(s). +void +Messages::loadContent(uint64_t msgId, + std::string& data, + uint64_t offset, + uint32_t length) +{ + log.loadContent(msgId, data, offset, length); +} + +// Recover the current set of messages and where they're queued from +// the log. +void +Messages::recover(qpid::broker::RecoveryManager& recoverer, + const std::set<uint64_t> &validQueues, + const std::map<uint64_t, Transaction::shared_ptr>& transMap, + qpid::store::MessageMap& messageMap, + qpid::store::MessageQueueMap& messageQueueMap) +{ + std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> > messageOps; + log.recover(recoverer, messageMap, messageOps); + // Now read through the messageOps replaying the operations with the + // knowledge of which transactions committed, aborted, etc. A transaction + // should not be deleted until there are no messages referencing it so + // a message operation with a transaction id not found in transMap is + // a serious problem. + QPID_LOG(debug, "Beginning CLFS-recovered message operation replay"); + // Keep track of any messages that are recovered from the log but don't + // have any place to be. This can happen, for example, if the broker + // crashes while logging a message deletion. After all the recovery is + // done, delete all the homeless messages. + std::vector<uint64_t> homeless; + std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> >::const_iterator msg; + for (msg = messageOps.begin(); msg != messageOps.end(); ++msg) { + uint64_t msgId = msg->first; + const std::vector<MessageLog::RecoveredMsgOp>& ops = msg->second; + QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " op(s)"); + MessageInfo::shared_ptr m(new MessageInfo); + std::vector<QueueEntry>& entries = messageQueueMap[msgId]; + std::vector<MessageLog::RecoveredMsgOp>::const_iterator op; + for (op = ops.begin(); op != ops.end(); ++op) { + QueueEntry entry(op->queueId); + MessageInfo::Location loc(op->queueId); + std::string dir = + op->op == MessageLog::RECOVERED_ENQUEUE ? "enqueue" + : "dequeue"; + if (validQueues.find(op->queueId) == validQueues.end()) { + QPID_LOG(info, + "Message " << msgId << dir << " on non-existant queue " + << op->queueId << "; dropped"); + continue; + } + if (op->txnId != 0) { + // Be sure to enroll this message in the transaction even if + // it has committed or aborted. This ensures that the + // transaction isn't removed from the log while finalizing the + // recovery. If it were to be removed and the broker failed + // again before removing this message during normal operation, + // it couldn't be recovered again. + // + // Recall what is being reconstructed; 2 things: + // 1. This class's 'messages' list which keeps track + // of the queues each message is on and the transactions + // each message is enrolled in. For this, aborted + // transactions cause the result of the operation to be + // ignored, but the message does need to be enrolled in + // the transaction to properly maintain the transaction + // references until the message is deleted. + // 2. The StorageProvider's MessageQueueMap, which also + // has an entry for each queue each message is on and + // its TPL status and associated xid. + const Transaction::shared_ptr &t = + transMap.find(op->txnId)->second; + // Prepared transactions cause the operation to be + // provisionally acted on, and the message to be enrolled in + // the transaction for when it commits/aborts. This is + // noted in the QueueEntry for the StorageProvider's map. + if (t->getState() == Transaction::TRANS_PREPARED) { + QPID_LOG(debug, dir << " for queue " << op->queueId << + ", prepared txn " << op->txnId); + TPCTransaction::shared_ptr tpct(boost::dynamic_pointer_cast<TPCTransaction>(t)); + if (tpct.get() == 0) + THROW_STORE_EXCEPTION("Invalid transaction state"); + t->enroll(msgId); + entry.xid = tpct->getXid(); + loc.transaction = t; + if (op->op == MessageLog::RECOVERED_ENQUEUE) { + entry.tplStatus = QueueEntry::ADDING; + loc.disposition = MessageInfo::TRANSACTION_ENQUEUE; + } + else { + entry.tplStatus = QueueEntry::REMOVING; + loc.disposition = MessageInfo::TRANSACTION_DEQUEUE; + } + } + else if (t->getState() != Transaction::TRANS_COMMITTED) { + QPID_LOG(debug, dir << " for queue " << op->queueId << + ", txn " << op->txnId << ", rolling back"); + continue; + } + } + // Here for non-transactional and prepared transactional operations + // to set up the messageQueueMap entries. Note that at this point + // a committed transactional operation looks like a + // non-transactional one as far as the QueueEntry is + // concerned - just do it. If this is an entry enqueuing a + // message, just add it to the entries list. If it's a dequeue + // operation, locate the matching entry for the queue and delete + // it if the current op is non-transactional; if it's a prepared + // transaction then replace the existing entry with the current + // one that notes the message is enqueued but being removed under + // a prepared transaction. + QPID_LOG(debug, dir + " at queue " << entry.queueId); + if (op->op == MessageLog::RECOVERED_ENQUEUE) { + entries.push_back(entry); + m->where.push_back(loc); + } + else { + std::vector<QueueEntry>::iterator i = entries.begin(); + while (i != entries.end()) { + if (i->queueId == entry.queueId) { + if (entry.tplStatus != QueueEntry::NONE) + *i = entry; + else + entries.erase(i); + break; + } + ++i; + } + std::list<MessageInfo::Location>::iterator w = m->where.begin(); + while (w != m->where.end()) { + if (w->queueId == loc.queueId) { + if (loc.transaction.get() != 0) { + *w = loc; + ++w; + } + else { + w = m->where.erase(w); + } + } + } + } + } + // Now that all the queue entries have been set correctly, see if + // there are any entries; they may have all been removed during + // recovery. If there are none, add this message to the homeless + // list to be deleted from the log after the recovery is done. + if (m->where.size() == 0) { + homeless.push_back(msgId); + messageMap.erase(msgId); + messageQueueMap.erase(msgId); + } + else { + std::pair<uint64_t, MessageInfo::shared_ptr> p(msgId, m); + messages.insert(p); + } + } + + QPID_LOG(debug, "Message log recovery done."); + // Done! Ok, go back and delete all the homeless messages. + BOOST_FOREACH(uint64_t msg, homeless) { + QPID_LOG(debug, "Deleting homeless message " << msg); + remove(msg); + } +} + +// Expunge is called when a queue is deleted. All references to that +// queue must be expunged from all messages. 'Dequeue' log records are +// written for each queue entry removed, but any errors are swallowed. +// On recovery there's a list of valid queues passed in. The deleted +// queue will not be on that list so if any references to it are +// recovered they'll get weeded out then. +void +Messages::expunge(uint64_t queueId) +{ + std::vector<uint64_t> toBeDeleted; // Messages to be deleted later. + + { + // Lock everybody out since all messages are possibly in play. + // There also may be other threads already working on particular + // messages so individual message mutex still must be acquired. + qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock); + MessageMap::iterator m; + for (m = messages.begin(); m != messages.end(); ++m) { + MessageInfo::shared_ptr p = m->second; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> ml(p->whereLock); + std::list<MessageInfo::Location>::iterator i = p->where.begin(); + while (i != p->where.end()) { + if (i->queueId != queueId) { + ++i; + continue; + } + // If this entry is involved in a transaction, unenroll it. + // Then remove the entry. + if (i->transaction.get() != 0) + i->transaction->unenroll(m->first); + i = p->where.erase(i); + try { + log.recordDequeue(m->first, queueId, 0); + } + catch(...) { + } + } + if (p->where.size() == 0) + toBeDeleted.push_back(m->first); + } + } + } + // Swallow any exceptions during this; don't care. Recover it later + // if needed. + try { + BOOST_FOREACH(uint64_t msg, toBeDeleted) + remove(msg); + } + catch(...) { + } +} + +// Remove a specified message from those controlled by this object. +void +Messages::remove(uint64_t messageId) +{ + uint64_t newFirstId = 0; + { + qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock); + messages.erase(messageId); + // May have deleted the first entry; if so the log can release that. + // If this message being deleted results in an empty list of + // messages, move the tail up to this message's LSN. This may + // result in one or more messages being stranded in the log + // until there's more activity. If a restart happens while these + // unneeded log records are there, the presence of the MessageDelete + // entry will cause the message(s) to be ignored anyway. + if (messages.empty()) + newFirstId = messageId; + else if (messages.begin()->first > messageId) + newFirstId = messages.begin()->first; + } + log.deleteMessage(messageId, newFirstId); +} + +}}} // namespace qpid::store::ms_clfs diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Messages.h b/qpid/cpp/src/qpid/store/ms-clfs/Messages.h new file mode 100644 index 0000000000..93cc8bfe62 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-clfs/Messages.h @@ -0,0 +1,144 @@ +#ifndef QPID_STORE_MSCLFS_MESSAGES_H +#define QPID_STORE_MSCLFS_MESSAGES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <windows.h> +#include <map> +#include <set> +#include <vector> +#include <boost/intrusive_ptr.hpp> +#include <qpid/broker/PersistableMessage.h> +#include <qpid/sys/Mutex.h> + +#include "MessageLog.h" +#include "Transaction.h" + +namespace qpid { +namespace store { +namespace ms_clfs { + +class Messages { + + struct MessageInfo { + // How many queues this message is on, whether actually (non-transacted) + // or provisionally (included in a non-yet-committed transaction). + volatile LONG enqueuedCount; + + // Keep a list of transactional operations this message is + // referenced in. When the transaction changes/finalizes these all + // need to be acted on. + typedef enum { TRANSACTION_NONE = 0, + TRANSACTION_ENQUEUE, + TRANSACTION_DEQUEUE } TransType; +#if 0 + std::map<Transaction::shared_ptr, std::vector<TransType> > transOps; + qpid::sys::Mutex transOpsLock; +#endif + // Think what I need is a list of "where is this message" - queue id, + // transaction ref, what kind of trans op (enq/deq). Then "remove all + // queue refs" can search through all messages looking for queue ids + // and undo them. Write "remove from queue" record to log. Also need to + // add "remove from queue" to recovery. + struct Location { + uint64_t queueId; + Transaction::shared_ptr transaction; + TransType disposition; + + Location(uint64_t q) + : queueId(q), transaction(), disposition(TRANSACTION_NONE) {} + Location(uint64_t q, Transaction::shared_ptr& t, TransType d) + : queueId(q), transaction(t), disposition(d) {} + }; + qpid::sys::Mutex whereLock; + std::list<Location> where; + // The transactions vector just keeps a shared_ptr to each + // Transaction this message was involved in, regardless of the + // disposition or transaction state. Keeping a valid shared_ptr + // prevents the Transaction from being deleted. As long as there + // are any messages that referred to a transaction, that + // transaction's state needs to be known so the message disposition + // can be correctly recovered if needed. + std::vector<Transaction::shared_ptr> transactions; + + typedef boost::shared_ptr<MessageInfo> shared_ptr; + + MessageInfo() : enqueuedCount(0) {} + }; + + qpid::sys::RWlock lock; + typedef std::map<uint64_t, MessageInfo::shared_ptr> MessageMap; + MessageMap messages; + MessageLog log; + + // Remove a specified message from those controlled by this object. + void remove(uint64_t messageId); + +public: + void openLog(const std::string& path, const Log::TuningParameters& params); + + // Add the specified message to the log and list of known messages. + // Upon successful return the message's persistenceId is set. + void add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg); + + // Add the specified queue to the message's list of places it is + // enqueued. + void enqueue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t); + + // Remove the specified queue from the message's list of places it is + // enqueued. If there are no other queues holding the message, it is + // deleted. + void dequeue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t); + + // Commit a previous provisional enqueue or dequeue of a particular message + // actions under a specified transaction. If this results in the message's + // being removed from all queues, it is deleted. + void commit(uint64_t msgId, Transaction::shared_ptr& transaction); + + // Abort a previous provisional enqueue or dequeue of a particular message + // actions under a specified transaction. If this results in the message's + // being removed from all queues, it is deleted. + void abort(uint64_t msgId, Transaction::shared_ptr& transaction); + + // Load part or all of a message's content from previously stored + // log record(s). + void loadContent(uint64_t msgId, + std::string& data, + uint64_t offset, + uint32_t length); + + // Expunge is called when a queue is deleted. All references to that + // queue must be expunged from all messages. + void expunge(uint64_t queueId); + + // Recover the current set of messages and where they're queued from + // the log. + void recover(qpid::broker::RecoveryManager& recoverer, + const std::set<uint64_t> &validQueues, + const std::map<uint64_t, Transaction::shared_ptr>& transMap, + qpid::store::MessageMap& messageMap, + qpid::store::MessageQueueMap& messageQueueMap); +}; + +}}} // namespace qpid::store::ms_clfs + +#endif /* QPID_STORE_MSCLFS_MESSAGES_H */ diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp new file mode 100644 index 0000000000..f94fef6f84 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Transaction.h" +#include "Messages.h" + +namespace qpid { +namespace store { +namespace ms_clfs { + +Transaction::~Transaction() +{ + // Transactions that are recovered then found to be deleted get destroyed + // but need not be logged. + if (state != TRANS_DELETED) + log->deleteTransaction(id); +} + +void +Transaction::enroll(uint64_t msgId) +{ + qpid::sys::ScopedWlock<qpid::sys::RWlock> l(enrollLock); + enrolledMessages.push_back(msgId); +} + +void +Transaction::unenroll(uint64_t msgId) +{ + qpid::sys::ScopedWlock<qpid::sys::RWlock> l(enrollLock); + for (std::vector<uint64_t>::iterator i = enrolledMessages.begin(); + i < enrolledMessages.end(); + ++i) { + if (*i == msgId) { + enrolledMessages.erase(i); + break; + } + } +} + +void +Transaction::abort(Messages& messages) +{ + log->recordAbort(id); + for (size_t i = 0; i < enrolledMessages.size(); ++i) + messages.abort(enrolledMessages[i], shared_from_this()); + state = TRANS_ABORTED; +} + +void +Transaction::commit(Messages& messages) +{ + log->recordCommit(id); + for (size_t i = 0; i < enrolledMessages.size(); ++i) + messages.commit(enrolledMessages[i], shared_from_this()); + state = TRANS_COMMITTED; +} + +void +TPCTransaction::prepare(void) +{ + log->recordPrepare(id); + state = TRANS_PREPARED; +} + +}}} // namespace qpid::store::ms_clfs diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h new file mode 100644 index 0000000000..499b01d503 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h @@ -0,0 +1,147 @@ +#ifndef QPID_STORE_MSCLFS_TRANSACTION_H +#define QPID_STORE_MSCLFS_TRANSACTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/broker/TransactionalStore.h> +#include <qpid/sys/Mutex.h> +#include <boost/enable_shared_from_this.hpp> +#include <boost/shared_ptr.hpp> +#include <string> +#include <vector> + +#include "TransactionLog.h" + +namespace qpid { +namespace store { +namespace ms_clfs { + +class Messages; + +/** + * @class Transaction + * + * Class representing an AMQP transaction. This is used around a set of + * enqueue and dequeue operations that occur when the broker is acting + * on a transaction commit/abort from the client. + * This class is what the store uses internally to implement things a + * transaction needs; the broker knows about TransactionContext, which + * holds a pointer to Transaction. + * + * NOTE: All references to Transactions (and TPCTransactions, below) are + * through Boost shared_ptr instances. All messages enrolled in a transaction + * hold a shared_ptr. Thus, a Transaction object will not be deleted until all + * messages holding a reference to it are deleted. This fact is also used + * during recovery to automatically clean up and delete any Transaction without + * messages left referring to it. + */ +class Transaction : public boost::enable_shared_from_this<Transaction> { +private: + // TransactionLog has to create all Transaction instances. + Transaction() {} + +public: + + typedef boost::shared_ptr<Transaction> shared_ptr; + typedef enum { TRANS_OPEN = 1, + TRANS_PREPARED, + TRANS_ABORTED, + TRANS_COMMITTED, + TRANS_DELETED } State; + + virtual ~Transaction(); + + uint64_t getId() { return id; } + State getState() { return state; } + + void enroll(uint64_t msgId); + void unenroll(uint64_t msgId); // For failed ops, not normal end-of-trans + + void abort(Messages& messages); + void commit(Messages& messages); + +protected: + friend class TransactionLog; + Transaction(uint64_t _id, const TransactionLog::shared_ptr& _log) + : id(_id), state(TRANS_OPEN), log(_log) {} + + uint64_t id; + State state; + TransactionLog::shared_ptr log; + std::vector<uint64_t> enrolledMessages; + qpid::sys::RWlock enrollLock; +}; + +class TransactionContext : public qpid::broker::TransactionContext { + Transaction::shared_ptr transaction; + +public: + TransactionContext(const Transaction::shared_ptr& _transaction) + : transaction(_transaction) {} + + virtual Transaction::shared_ptr& getTransaction() { return transaction; } +}; + +/** + * @class TPCTransaction + * + * Class representing a Two-Phase-Commit (TPC) AMQP transaction. This is + * used around a set of enqueue and dequeue operations that occur when the + * broker is acting on a transaction prepare/commit/abort from the client. + * This class is what the store uses internally to implement things a + * transaction needs; the broker knows about TPCTransactionContext, which + * holds a pointer to TPCTransaction. + */ +class TPCTransaction : public Transaction { + + friend class TransactionLog; + TPCTransaction(uint64_t _id, + const TransactionLog::shared_ptr& _log, + const std::string& _xid) + : Transaction(_id, _log), xid(_xid) {} + + std::string xid; + +public: + typedef boost::shared_ptr<TPCTransaction> shared_ptr; + + virtual ~TPCTransaction() {} + + void prepare(); + bool isPrepared() const { return state == TRANS_PREPARED; } + + const std::string& getXid(void) const { return xid; } +}; + +class TPCTransactionContext : public qpid::broker::TPCTransactionContext { + TPCTransaction::shared_ptr transaction; + +public: + TPCTransactionContext(const TPCTransaction::shared_ptr& _transaction) + : transaction(_transaction) {} + + virtual TPCTransaction::shared_ptr& getTransaction() { return transaction; } +}; + +}}} // namespace qpid::store::ms_clfs + +#endif /* QPID_STORE_MSCLFS_TRANSACTION_H */ diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp new file mode 100644 index 0000000000..0ef046d7c8 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp @@ -0,0 +1,428 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <windows.h> +#include <clfsw32.h> +#include <exception> +#include <malloc.h> +#include <memory.h> +#include <qpid/framing/Buffer.h> +#include <qpid/log/Statement.h> +#include <qpid/sys/IntegerTypes.h> +#include <qpid/sys/windows/check.h> + +#include "TransactionLog.h" +#include "Transaction.h" +#include "Lsn.h" + +namespace qpid { +namespace store { +namespace ms_clfs { + +namespace { + +// Structures that hold log records. Each has a type field at the start. +enum TransactionEntryType { + TransactionStartDtxEntry = 1, + TransactionStartTxEntry = 2, + TransactionPrepareEntry = 3, + TransactionCommitEntry = 4, + TransactionAbortEntry = 5, + TransactionDeleteEntry = 6 +}; +// The only thing that really takes up space in transaction records is the +// xid. Max xid length is in the neighborhood of 600 bytes. Leave some room. +static const uint32_t MaxTransactionContentLength = 1024; + +// Dtx-Start +struct TransactionStartDtx { + TransactionEntryType type; + uint32_t length; + char content[MaxTransactionContentLength]; + + TransactionStartDtx() + : type(TransactionStartDtxEntry), length(0) {} +}; +// Tx-Start +struct TransactionStartTx { + TransactionEntryType type; + + TransactionStartTx() + : type(TransactionStartTxEntry) {} +}; +// Prepare +struct TransactionPrepare { + TransactionEntryType type; + + TransactionPrepare() + : type(TransactionPrepareEntry) {} +}; +// Commit +struct TransactionCommit { + TransactionEntryType type; + + TransactionCommit() + : type(TransactionCommitEntry) {} +}; +// Abort +struct TransactionAbort { + TransactionEntryType type; + + TransactionAbort() + : type(TransactionAbortEntry) {} +}; +// Delete +struct TransactionDelete { + TransactionEntryType type; + + TransactionDelete() + : type(TransactionDeleteEntry) {} +}; + +} // namespace + +void +TransactionLog::initialize() +{ + // Write something to occupy the first record, preventing a real + // transaction from being lsn/id 0. Delete of a non-existant id is easily + // tossed during recovery if no other transactions have caused the tail + // to be moved up past this dummy record by then. + deleteTransaction(0); +} + +uint32_t +TransactionLog::marshallingBufferSize() +{ + size_t biggestNeed = sizeof(TransactionStartDtx); + uint32_t defSize = static_cast<uint32_t>(biggestNeed); + uint32_t minSize = Log::marshallingBufferSize(); + if (defSize <= minSize) + return minSize; + // Round up to multiple of minSize + return (defSize + minSize) / minSize * minSize; +} + +// Get a new Transaction +boost::shared_ptr<Transaction> +TransactionLog::begin() +{ + TransactionStartTx entry; + CLFS_LSN location; + uint64_t id; + uint32_t entryLength = static_cast<uint32_t>(sizeof(entry)); + location = write(&entry, entryLength); + try { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock); + id = lsnToId(location); + std::auto_ptr<Transaction> t(new Transaction(id, shared_from_this())); + boost::shared_ptr<Transaction> t2(t); + boost::weak_ptr<Transaction> weak_t2(t2); + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock); + validIds[id] = weak_t2; + } + return t2; + } + catch(...) { + deleteTransaction(id); + throw; + } +} + +// Get a new TPCTransaction +boost::shared_ptr<TPCTransaction> +TransactionLog::begin(const std::string& xid) +{ + TransactionStartDtx entry; + CLFS_LSN location; + uint64_t id; + uint32_t entryLength = static_cast<uint32_t>(sizeof(entry)); + entry.length = static_cast<uint32_t>(xid.length()); + memcpy_s(entry.content, sizeof(entry.content), + xid.c_str(), xid.length()); + entryLength -= (sizeof(entry.content) - entry.length); + location = write(&entry, entryLength); + try { + id = lsnToId(location); + std::auto_ptr<TPCTransaction> t(new TPCTransaction(id, + shared_from_this(), + xid)); + boost::shared_ptr<TPCTransaction> t2(t); + boost::weak_ptr<Transaction> weak_t2(t2); + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock); + validIds[id] = weak_t2; + } + return t2; + } + catch(...) { + deleteTransaction(id); + throw; + } +} + +void +TransactionLog::recordPrepare(uint64_t transId) +{ + TransactionPrepare entry; + CLFS_LSN transLsn = idToLsn(transId); + write(&entry, sizeof(entry), &transLsn); +} + +void +TransactionLog::recordCommit(uint64_t transId) +{ + TransactionCommit entry; + CLFS_LSN transLsn = idToLsn(transId); + write(&entry, sizeof(entry), &transLsn); + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock); + validIds[transId].reset(); + } +} + +void +TransactionLog::recordAbort(uint64_t transId) +{ + TransactionAbort entry; + CLFS_LSN transLsn = idToLsn(transId); + write(&entry, sizeof(entry), &transLsn); + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock); + validIds[transId].reset(); + } +} + +void +TransactionLog::deleteTransaction(uint64_t transId) +{ + uint64_t newFirstId = 0; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock); + validIds.erase(transId); + // May have deleted the first entry; if so the log can release that. + // If this deletion results in an empty list of transactions, + // move the tail up to this transaction's LSN. This may result in + // one or more transactions being stranded in the log until there's + // more activity. If a restart happens while these unneeded log + // records are there, the presence of the TransactionDelete + // entry will cause them to be ignored anyway. + if (validIds.empty()) + newFirstId = transId; + else if (validIds.begin()->first > transId) + newFirstId = validIds.begin()->first; + } + TransactionDelete deleteEntry; + CLFS_LSN transLsn = idToLsn(transId); + write(&deleteEntry, sizeof(deleteEntry), &transLsn); + if (newFirstId != 0) + moveTail(idToLsn(newFirstId)); +} + +void +TransactionLog::collectPreparedXids(std::map<std::string, TPCTransaction::shared_ptr>& preparedMap) +{ + // Go through all the known transactions; if the transaction is still + // valid (open or prepared) it will have weak_ptr to the Transaction. + // If it can be downcast and has a state of TRANS_PREPARED, add to the map. + qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock); + std::map<uint64_t, boost::weak_ptr<Transaction> >::const_iterator i; + for (i = validIds.begin(); i != validIds.end(); ++i) { + Transaction::shared_ptr t = i->second.lock(); + if (t.get() == 0) + continue; + TPCTransaction::shared_ptr tpct(boost::dynamic_pointer_cast<TPCTransaction>(t)); + if (tpct.get() == 0) + continue; + if (tpct->state == Transaction::TRANS_PREPARED) + preparedMap[tpct->getXid()] = tpct; + } +} + +void +TransactionLog::recover(std::map<uint64_t, Transaction::shared_ptr>& transMap) +{ + QPID_LOG(debug, "Recovering transaction log"); + + // Note that there may be transaction refs in the log which are deleted, + // so be sure to only add transactions at Start records, and ignore those + // that don't have an existing message record. + // Get the base LSN - that's how to say "start reading at the beginning" + CLFS_INFORMATION info; + ULONG infoLength = sizeof (info); + BOOL ok = ::GetLogFileInformation(handle, &info, &infoLength); + QPID_WINDOWS_CHECK_NOT(ok, 0); + + // Pointers for the various record types that can be assigned in the + // reading loop below. + TransactionStartDtx *startDtxEntry; + TransactionStartTx *startTxEntry; + + PVOID recordPointer; + ULONG recordLength; + CLFS_RECORD_TYPE recordType = ClfsDataRecord; + CLFS_LSN transLsn, current, undoNext; + PVOID readContext; + uint64_t transId; + // Note 'current' in case it's needed below; ReadNextLogRecord returns it + // via a parameter. + current = info.BaseLsn; + ok = ::ReadLogRecord(marshal, + &info.BaseLsn, + ClfsContextForward, + &recordPointer, + &recordLength, + &recordType, + &undoNext, + &transLsn, + &readContext, + 0); + + std::auto_ptr<Transaction> tPtr; + std::auto_ptr<TPCTransaction> tpcPtr; + while (ok) { + std::string xid; + + // All the record types this class writes have a TransactionEntryType + // in the beginning. Based on that, do what's needed. + TransactionEntryType *t = + reinterpret_cast<TransactionEntryType *>(recordPointer); + switch(*t) { + case TransactionStartDtxEntry: + startDtxEntry = + reinterpret_cast<TransactionStartDtx *>(recordPointer); + transId = lsnToId(current); + QPID_LOG(debug, "Dtx start, id " << transId); + xid.assign(startDtxEntry->content, startDtxEntry->length); + tpcPtr.reset(new TPCTransaction(transId, shared_from_this(), xid)); + transMap[transId] = boost::shared_ptr<TPCTransaction>(tpcPtr); + break; + case TransactionStartTxEntry: + startTxEntry = + reinterpret_cast<TransactionStartTx *>(recordPointer); + transId = lsnToId(current); + QPID_LOG(debug, "Tx start, id " << transId); + tPtr.reset(new Transaction(transId, shared_from_this())); + transMap[transId] = boost::shared_ptr<Transaction>(tPtr); + break; + case TransactionPrepareEntry: + transId = lsnToId(transLsn); + QPID_LOG(debug, "Dtx prepare, id " << transId); + if (transMap.find(transId) == transMap.end()) { + QPID_LOG(debug, + "Dtx " << transId << " doesn't exist; discarded"); + } + else { + transMap[transId]->state = Transaction::TRANS_PREPARED; + } + break; + case TransactionCommitEntry: + transId = lsnToId(transLsn); + QPID_LOG(debug, "Txn commit, id " << transId); + if (transMap.find(transId) == transMap.end()) { + QPID_LOG(debug, + "Txn " << transId << " doesn't exist; discarded"); + } + else { + transMap[transId]->state = Transaction::TRANS_COMMITTED; + } + break; + case TransactionAbortEntry: + transId = lsnToId(transLsn); + QPID_LOG(debug, "Txn abort, id " << transId); + if (transMap.find(transId) == transMap.end()) { + QPID_LOG(debug, + "Txn " << transId << " doesn't exist; discarded"); + } + else { + transMap[transId]->state = Transaction::TRANS_ABORTED; + } + break; + case TransactionDeleteEntry: + transId = lsnToId(transLsn); + QPID_LOG(debug, "Txn delete, id " << transId); + if (transMap.find(transId) == transMap.end()) { + QPID_LOG(debug, + "Txn " << transId << " doesn't exist; discarded"); + } + else { + transMap[transId]->state = Transaction::TRANS_DELETED; + transMap.erase(transId); + } + break; + default: + throw std::runtime_error("Bad transaction log entry type"); + } + + recordType = ClfsDataRecord; + ok = ::ReadNextLogRecord(readContext, + &recordPointer, + &recordLength, + &recordType, + 0, // No userLsn + &undoNext, + &transLsn, + ¤t, + 0); + } + DWORD status = ::GetLastError(); + ::TerminateReadLog(readContext); + if (status != ERROR_HANDLE_EOF) // No more records + throw QPID_WINDOWS_ERROR(status); + + QPID_LOG(debug, "Transaction log recovered"); + + // At this point we have a list of all the not-deleted transactions that + // were in existence when the broker last ran. All transactions of both + // Dtx and Tx types that haven't prepared or committed will be aborted. + // This will give the proper background against which to decide each + // message's disposition when recovering messages that were involved in + // transactions. + // In addition to recovering and aborting transactions, rebuild the + // validIds map now that we know which ids are really valid. + std::map<uint64_t, Transaction::shared_ptr>::const_iterator i; + for (i = transMap.begin(); i != transMap.end(); ++i) { + switch(i->second->state) { + case Transaction::TRANS_OPEN: + QPID_LOG(debug, "Txn " << i->first << " was open; aborted"); + i->second->state = Transaction::TRANS_ABORTED; + break; + case Transaction::TRANS_ABORTED: + QPID_LOG(debug, "Txn " << i->first << " was aborted"); + break; + case Transaction::TRANS_COMMITTED: + QPID_LOG(debug, "Txn " << i->first << " was committed"); + break; + case Transaction::TRANS_PREPARED: + QPID_LOG(debug, "Txn " << i->first << " was prepared"); + break; + case Transaction::TRANS_DELETED: + QPID_LOG(error, + "Txn " << i->first << " was deleted; shouldn't be here"); + break; + } + boost::weak_ptr<Transaction> weak_txn(i->second); + validIds[i->first] = weak_txn; + } +} + +}}} // namespace qpid::store::ms_clfs diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h new file mode 100644 index 0000000000..7ca27c229e --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h @@ -0,0 +1,104 @@ +#ifndef QPID_STORE_MSCLFS_TRANSACTIONLOG_H +#define QPID_STORE_MSCLFS_TRANSACTIONLOG_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <set> + +#include <boost/enable_shared_from_this.hpp> +#include <boost/shared_ptr.hpp> + +#include <qpid/broker/RecoveryManager.h> +#include <qpid/sys/IntegerTypes.h> +#include <qpid/sys/Mutex.h> + +#include "Log.h" + +namespace qpid { +namespace store { +namespace ms_clfs { + +class Transaction; +class TPCTransaction; + +/** + * @class TransactionLog + * + * Represents a CLFS-housed transaction log. + */ +class TransactionLog : public Log, + public boost::enable_shared_from_this<TransactionLog> { + + // To know when it's ok to move the log tail the lowest valid Id must + // always be known. Keep track of valid Ids here. These are transactions + // which have not yet been Deleted in the log. They may be new, in progress, + // prepared, committed, or aborted - but not deleted. + // Entries corresponding to not-yet-finalized transactions (i.e., open or + // prepared) also have a weak_ptr so the Transaction can be accessed. + // This is primarily to check its state and get a list of prepared Xids. + std::map<uint64_t, boost::weak_ptr<Transaction> > validIds; + qpid::sys::Mutex idsLock; + +protected: + // Transaction log needs to have a no-op first record written in the log + // to ensure that no real transaction gets an ID 0; messages think trans + // id 0 means "no transaction." + virtual void initialize(); + +public: + // Inherited and reimplemented from Log. Figure the minimum marshalling + // buffer size needed for the records this class writes. + virtual uint32_t marshallingBufferSize(); + + typedef boost::shared_ptr<TransactionLog> shared_ptr; + + // Get a new Transaction + boost::shared_ptr<Transaction> begin(); + + // Get a new TPCTransaction + boost::shared_ptr<TPCTransaction> begin(const std::string& xid); + + void recordPrepare(uint64_t transId); + void recordCommit(uint64_t transId); + void recordAbort(uint64_t transId); + void deleteTransaction(uint64_t transId); + + // Fill @arg preparedMap with Xid->TPCTransaction::shared_ptr for all + // currently prepared transactions. + void collectPreparedXids(std::map<std::string, boost::shared_ptr<TPCTransaction> >& preparedMap); + + // Recover the transactions and their state from the log. + // Every non-deleted transaction recovered from the log will be + // represented in @arg transMap. The recovering messages can use this + // information to tell if a transaction referred to in an enqueue/dequeue + // operation should be recovered or dropped by examining transaction state. + // + // @param recoverer Recovery manager used to recreate broker objects from + // entries recovered from the log. + // @param transMap This method fills in the map of id -> shared_ptr of + // recovered transactions. + void recover(std::map<uint64_t, boost::shared_ptr<Transaction> >& transMap); +}; + +}}} // namespace qpid::store::ms_clfs + +#endif /* QPID_STORE_MSCLFS_TRANSACTIONLOG_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp new file mode 100644 index 0000000000..095d1bf331 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AmqpTransaction.h" +#include "DatabaseConnection.h" + +namespace qpid { +namespace store { +namespace ms_sql { + +AmqpTransaction::AmqpTransaction(const boost::shared_ptr<DatabaseConnection>& _db) + : db(_db), sqlTrans(_db) +{ +} + +AmqpTransaction::~AmqpTransaction() +{ +} + +void +AmqpTransaction::sqlBegin() +{ + sqlTrans.begin(); +} + +void +AmqpTransaction::sqlCommit() +{ + sqlTrans.commit(); +} + +void +AmqpTransaction::sqlAbort() +{ + sqlTrans.abort(); +} + + +AmqpTPCTransaction::AmqpTPCTransaction(const boost::shared_ptr<DatabaseConnection>& db, + const std::string& _xid) + : AmqpTransaction(db), prepared(false), xid(_xid) +{ +} + +AmqpTPCTransaction::~AmqpTPCTransaction() +{ +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h new file mode 100644 index 0000000000..625fab5595 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h @@ -0,0 +1,85 @@ +#ifndef QPID_STORE_MSSQL_AMQPTRANSACTION_H +#define QPID_STORE_MSSQL_AMQPTRANSACTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/broker/TransactionalStore.h> +#include <boost/shared_ptr.hpp> +#include <string> + +#include "SqlTransaction.h" + +namespace qpid { +namespace store { +namespace ms_sql { + +class DatabaseConnection; + +/** + * @class AmqpTransaction + * + * Class representing an AMQP transaction. This is used around a set of + * enqueue and dequeue operations that occur when the broker is acting + * on a transaction commit/abort from the client. + */ +class AmqpTransaction : public qpid::broker::TransactionContext { + + boost::shared_ptr<DatabaseConnection> db; + SqlTransaction sqlTrans; + +public: + AmqpTransaction(const boost::shared_ptr<DatabaseConnection>& _db); + virtual ~AmqpTransaction(); + + DatabaseConnection *dbConn() { return db.get(); } + + void sqlBegin(); + void sqlCommit(); + void sqlAbort(); +}; + +/** + * @class AmqpTPCTransaction + * + * Class representing a Two-Phase-Commit (TPC) AMQP transaction. This is + * used around a set of enqueue and dequeue operations that occur when the + * broker is acting on a transaction prepare/commit/abort from the client. + */ +class AmqpTPCTransaction : public AmqpTransaction, + public qpid::broker::TPCTransactionContext { + bool prepared; + std::string xid; + +public: + AmqpTPCTransaction(const boost::shared_ptr<DatabaseConnection>& db, + const std::string& _xid); + virtual ~AmqpTPCTransaction(); + + void setPrepared(void) { prepared = true; } + bool isPrepared(void) const { return prepared; } + + const std::string& getXid(void) const { return xid; } +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_AMQPTRANSACTION_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp new file mode 100644 index 0000000000..1dc4370312 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp @@ -0,0 +1,165 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Exception.h> +#include <qpid/log/Statement.h> + +#include "BindingRecordset.h" +#include "BlobAdapter.h" +#include "BlobEncoder.h" +#include "VariantHelper.h" + +namespace qpid { +namespace store { +namespace ms_sql { + +void +BindingRecordset::removeFilter(const std::string& filter) +{ + rs->PutFilter (VariantHelper<std::string>(filter)); + long recs = rs->GetRecordCount(); + if (recs == 0) + return; // Nothing to do + while (recs > 0) { + // Deleting adAffectAll doesn't work as documented; go one by one. + rs->Delete(adAffectCurrent); + if (--recs > 0) + rs->MoveNext(); + } + rs->Update(); +} + +void +BindingRecordset::add(uint64_t exchangeId, + uint64_t queueId, + const std::string& routingKey, + const qpid::framing::FieldTable& args) +{ + VariantHelper<std::string> routingKeyStr(routingKey); + BlobEncoder blob (args); // Marshall field table to a blob + rs->AddNew(); + rs->Fields->GetItem("exchangeId")->Value = exchangeId; + rs->Fields->GetItem("queueId")->Value = queueId; + rs->Fields->GetItem("routingKey")->Value = routingKeyStr; + rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob); + rs->Update(); +} + +void +BindingRecordset::remove(uint64_t exchangeId, + uint64_t queueId, + const std::string& routingKey, + const qpid::framing::FieldTable& /*args*/) +{ + // Look up the affected binding. + std::ostringstream filter; + filter << "exchangeId = " << exchangeId + << " AND queueId = " << queueId + << " AND routingKey = '" << routingKey << "'" << std::ends; + removeFilter(filter.str()); +} + +void +BindingRecordset::removeForExchange(uint64_t exchangeId) +{ + // Look up the affected bindings by the exchange ID + std::ostringstream filter; + filter << "exchangeId = " << exchangeId << std::ends; + removeFilter(filter.str()); +} + +void +BindingRecordset::removeForQueue(uint64_t queueId) +{ + // Look up the affected bindings by the queue ID + std::ostringstream filter; + filter << "queueId = " << queueId << std::ends; + removeFilter(filter.str()); +} + +void +BindingRecordset::recover(broker::RecoveryManager& recoverer, + const store::ExchangeMap& exchMap, + const store::QueueMap& queueMap) +{ + if (rs->BOF && rs->EndOfFile) + return; // Nothing to do + rs->MoveFirst(); + Binding b; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&b); + while (!rs->EndOfFile) { + long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + store::ExchangeMap::const_iterator exch = exchMap.find(b.exchangeId); + if (exch == exchMap.end()) { + std::ostringstream msg; + msg << "Error recovering bindings; exchange ID " << b.exchangeId + << " not found in exchange map"; + throw qpid::Exception(msg.str()); + } + broker::RecoverableExchange::shared_ptr exchPtr = exch->second; + store::QueueMap::const_iterator q = queueMap.find(b.queueId); + if (q == queueMap.end()) { + std::ostringstream msg; + msg << "Error recovering bindings; queue ID " << b.queueId + << " not found in queue map"; + throw qpid::Exception(msg.str()); + } + broker::RecoverableQueue::shared_ptr qPtr = q->second; + // The recovery manager wants the queue name, so get it from the + // RecoverableQueue. + std::string key(b.routingKey); + exchPtr->bind(qPtr->getName(), key, blob); + rs->MoveNext(); + } + + piAdoRecordBinding->Release(); +} + +void +BindingRecordset::dump() +{ + Recordset::dump(); + if (rs->EndOfFile && rs->BOF) // No records + return; + rs->MoveFirst(); + + Binding b; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&b); + + while (VARIANT_FALSE == rs->EndOfFile) { + QPID_LOG(notice, "exch Id " << b.exchangeId + << ", q Id " << b.queueId + << ", k: " << b.routingKey); + rs->MoveNext(); + } + + piAdoRecordBinding->Release(); +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h new file mode 100644 index 0000000000..3cb732de75 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h @@ -0,0 +1,88 @@ +#ifndef QPID_STORE_MSSQL_BINDINGRECORDSET_H +#define QPID_STORE_MSSQL_BINDINGRECORDSET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <icrsint.h> +#include "Recordset.h" +#include <qpid/store/StorageProvider.h> +#include <qpid/broker/RecoveryManager.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class BindingRecordset + * + * Class for the binding records. + */ +class BindingRecordset : public Recordset { + + class Binding : public CADORecordBinding { + BEGIN_ADO_BINDING(Binding) + ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, exchangeId, FALSE) + ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE) + ADO_VARIABLE_LENGTH_ENTRY4(3, adVarChar, routingKey, + sizeof(routingKey), FALSE) + END_ADO_BINDING() + + public: + uint64_t exchangeId; + uint64_t queueId; + char routingKey[256]; + }; + + // Remove all records matching the specified filter/query. + void removeFilter(const std::string& filter); + +public: + // Add a new binding + void add(uint64_t exchangeId, + uint64_t queueId, + const std::string& routingKey, + const qpid::framing::FieldTable& args); + + // Remove a specific binding + void remove(uint64_t exchangeId, + uint64_t queueId, + const std::string& routingKey, + const qpid::framing::FieldTable& args); + + // Remove all bindings for the specified exchange + void removeForExchange(uint64_t exchangeId); + + // Remove all bindings for the specified queue + void removeForQueue(uint64_t queueId); + + // Recover bindings set using exchMap to get from Id to RecoverableExchange. + void recover(qpid::broker::RecoveryManager& recoverer, + const qpid::store::ExchangeMap& exchMap, + const qpid::store::QueueMap& queueMap); + + // Dump table contents; useful for debugging. + void dump(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_BINDINGRECORDSET_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp new file mode 100644 index 0000000000..1889f34e41 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BlobAdapter.h" +#include <qpid/Exception.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +void +BlobAdapter::extractBuff() +{ + // To give a valid Buffer back, lock the safearray, obtaining a pointer to + // the actual data. Record the pointer in the Buffer so the destructor + // knows to unlock the safearray. + if (buff.getPointer() == 0) { + char *blob; + SafeArrayAccessData(this->parray, (void **)&blob); + qpid::framing::Buffer lockedBuff(blob, buff.getSize()); + buff = lockedBuff; + } +} + + +BlobAdapter::~BlobAdapter() +{ + // If buff's pointer is set, the safearray is locked, so unlock it + if (buff.getPointer() != 0) + SafeArrayUnaccessData(this->parray); +} + +BlobAdapter::operator qpid::framing::Buffer& () +{ + extractBuff(); + return buff; +} + +BlobAdapter::operator qpid::framing::FieldTable& () +{ + extractBuff(); + fields.decode(buff); + return fields; +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h new file mode 100644 index 0000000000..1c666392bc --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h @@ -0,0 +1,62 @@ +#ifndef QPID_STORE_MSSQL_BLOBADAPTER_H +#define QPID_STORE_MSSQL_BLOBADAPTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <comutil.h> +#include <qpid/framing/Buffer.h> +#include <qpid/framing/FieldTable.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class BlobAdapter + * + * Adapter for accessing a blob (varbinary SQL field) as a qpid::framing::Buffer + * in an exception-safe way. + */ +class BlobAdapter : public _variant_t { +private: + // This Buffer's pointer indicates whether or not a safearray has + // been locked; if it's 0, no locking was done. + qpid::framing::Buffer buff; + qpid::framing::FieldTable fields; + + void extractBuff(); + +public: + // Initialize with the known length of the data that will come. + // Assigning a _variant_t to this object will set up the array to be + // accessed with the operator Buffer&() + BlobAdapter(long blobSize) : _variant_t(), buff(0, blobSize) {} + ~BlobAdapter(); + BlobAdapter& operator=(_variant_t& var_t_Src) + { _variant_t::operator=(var_t_Src); return *this; } + operator qpid::framing::Buffer& (); + operator qpid::framing::FieldTable& (); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_BLOBADAPTER_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp new file mode 100644 index 0000000000..75d3dc2d86 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp @@ -0,0 +1,133 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BlobEncoder.h" +#include <qpid/Exception.h> +#include <qpid/broker/Persistable.h> +#include <qpid/broker/PersistableMessage.h> +#include <boost/intrusive_ptr.hpp> +#include <memory.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +template <class ITEM> void +BlobEncoder::encode(const ITEM &item) +{ + SAFEARRAYBOUND bound[1] = {0, 0}; + bound[0].cElements = item.encodedSize(); + blob = SafeArrayCreate(VT_UI1, 1, bound); + if (S_OK != SafeArrayLock(blob)) { + SafeArrayDestroy(blob); + blob = 0; + throw qpid::Exception("Error locking blob area for persistable item"); + } + try { + qpid::framing::Buffer buff((char *)blob->pvData, bound[0].cElements); + item.encode(buff); + } + catch(...) { + SafeArrayUnlock(blob); + SafeArrayDestroy(blob); + blob = 0; + throw; + } + this->vt = VT_ARRAY | VT_UI1; + this->parray = blob; + SafeArrayUnlock(blob); +} + +template <> void +BlobEncoder::encode(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &item) +{ + // NOTE! If this code changes, verify the recovery code in MessageRecordset + SAFEARRAYBOUND bound[1] = {0, 0}; + bound[0].cElements = item->encodedSize() + sizeof(uint32_t); + blob = SafeArrayCreate(VT_UI1, 1, bound); + if (S_OK != SafeArrayLock(blob)) { + SafeArrayDestroy(blob); + blob = 0; + throw qpid::Exception("Error locking blob area for message"); + } + try { + uint32_t headerSize = item->encodedHeaderSize(); + qpid::framing::Buffer buff((char *)blob->pvData, bound[0].cElements); + buff.putLong(headerSize); + item->encode(buff); + } + catch(...) { + SafeArrayUnlock(blob); + SafeArrayDestroy(blob); + blob = 0; + throw; + } + this->vt = VT_ARRAY | VT_UI1; + this->parray = blob; + SafeArrayUnlock(blob); +} + +template <> void +BlobEncoder::encode(const std::string &item) +{ + SAFEARRAYBOUND bound[1] = {0, 0}; + bound[0].cElements = item.size(); + blob = SafeArrayCreate(VT_UI1, 1, bound); + if (S_OK != SafeArrayLock(blob)) { + SafeArrayDestroy(blob); + blob = 0; + throw qpid::Exception("Error locking blob area for string"); + } + memcpy_s(blob->pvData, item.size(), item.data(), item.size()); + this->vt = VT_ARRAY | VT_UI1; + this->parray = blob; + SafeArrayUnlock(blob); +} + +BlobEncoder::BlobEncoder(const qpid::broker::Persistable &item) : blob(0) +{ + encode(item); +} + +BlobEncoder::BlobEncoder(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &msg) : blob(0) +{ + encode(msg); +} + +BlobEncoder::BlobEncoder(const qpid::framing::FieldTable &fields) : blob(0) +{ + encode(fields); +} + +BlobEncoder::BlobEncoder(const std::string &data) : blob(0) +{ + encode(data); +} + +BlobEncoder::~BlobEncoder() +{ + if (blob) + SafeArrayDestroy(blob); + blob = 0; + this->parray = 0; +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h new file mode 100644 index 0000000000..d2b56223c1 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h @@ -0,0 +1,61 @@ +#ifndef QPID_STORE_MSSQL_BLOBENCODER_H +#define QPID_STORE_MSSQL_BLOBENCODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <comutil.h> +#include <string> +#include <boost/intrusive_ptr.hpp> +#include <qpid/broker/Persistable.h> +#include <qpid/broker/PersistableMessage.h> +#include <qpid/framing/Buffer.h> +#include <qpid/framing/FieldTable.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class BlobEncoder + * + * Encodes a blob (varbinary) field from a qpid::broker::Persistable or a + * qpid::framing::FieldTable (both of which can be encoded to + * qpid::framing::Buffer) so it can be passed to ADO methods for writing + * to the database. + */ +class BlobEncoder : public _variant_t { +private: + SAFEARRAY *blob; + + template <class ITEM> void encode(const ITEM &item); + +public: + BlobEncoder(const qpid::broker::Persistable &item); + BlobEncoder(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &msg); + BlobEncoder(const qpid::framing::FieldTable &fields); + BlobEncoder(const std::string& data); + ~BlobEncoder(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_BLOBENCODER_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp new file mode 100644 index 0000000000..ef1757dbad --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Exception.h> +#include <qpid/log/Statement.h> + +#include "BlobRecordset.h" +#include "BlobEncoder.h" +#include "VariantHelper.h" + +namespace qpid { +namespace store { +namespace ms_sql { + +void +BlobRecordset::add(const qpid::broker::Persistable& item) +{ + BlobEncoder blob (item); // Marshall item info to a blob + rs->AddNew(); + rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob); + rs->Update(); + uint64_t id = rs->Fields->Item["persistenceId"]->Value; + item.setPersistenceId(id); +} + +void +BlobRecordset::remove(uint64_t id) +{ + // Look up the item by its persistenceId + std::ostringstream filter; + filter << "persistenceId = " << id << std::ends; + rs->PutFilter (VariantHelper<std::string>(filter.str())); + if (!rs->EndOfFile) { + // Delete the record + rs->Delete(adAffectCurrent); + rs->Update(); + } +} + +void +BlobRecordset::remove(const qpid::broker::Persistable& item) +{ + remove(item.getPersistenceId()); +} + +void +BlobRecordset::dump() +{ + Recordset::dump(); +#if 1 + if (rs->EndOfFile && rs->BOF) // No records + return; + + rs->MoveFirst(); + while (!rs->EndOfFile) { + uint64_t id = rs->Fields->Item["persistenceId"]->Value; + QPID_LOG(notice, " -> " << id); + rs->MoveNext(); + } +#else + for (Iterator iter = begin(); iter != end(); ++iter) { + uint64_t id = *iter.first; + QPID_LOG(notice, " -> " << id); + } +#endif +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h new file mode 100644 index 0000000000..4d1c338746 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h @@ -0,0 +1,54 @@ +#ifndef QPID_STORE_MSSQL_BLOBRECORDSET_H +#define QPID_STORE_MSSQL_BLOBRECORDSET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Recordset.h" +#include <qpid/broker/Persistable.h> +#include <string> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class BlobRecordset + * + * Class for the "blob" records that record an id, varbinary(max) pair. + */ +class BlobRecordset : public Recordset { +protected: + +public: + void add(const qpid::broker::Persistable& item); + + // Remove a record given its Id. + void remove(uint64_t id); + void remove(const qpid::broker::Persistable& item); + + // Dump table contents; useful for debugging. + void dump(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_BLOBRECORDSET_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp new file mode 100644 index 0000000000..3219ea526a --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "DatabaseConnection.h" +#include "Exception.h" +#include <comdef.h> +namespace { +inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);}; +} + +namespace qpid { +namespace store { +namespace ms_sql { + +DatabaseConnection::DatabaseConnection() : conn(0) +{ +} + +DatabaseConnection::~DatabaseConnection() +{ + close(); +} + +void +DatabaseConnection::open(const std::string& connectString, + const std::string& dbName) +{ + if (conn && conn->State == adStateOpen) + return; + std::string adoConnect = "Provider=SQLOLEDB;" + connectString; + try { + TESTHR(conn.CreateInstance(__uuidof(Connection))); + conn->ConnectionString = adoConnect.c_str(); + conn->Open("", "", "", adConnectUnspecified); + if (dbName.length() > 0) + conn->DefaultDatabase = dbName.c_str(); + } + catch(_com_error &e) { + close(); + throw ADOException("MSSQL can't open " + dbName + " at " + adoConnect, e); + } +} + +void +DatabaseConnection::close() +{ + if (conn && conn->State == adStateOpen) + conn->Close(); + conn = 0; +} + +std::string +DatabaseConnection::getErrors() +{ + long errCount = conn->Errors->Count; + if (errCount <= 0) + return ""; + // Collection ranges from 0 to nCount -1. + std::ostringstream messages; + ErrorPtr pErr = NULL; + for (long i = 0 ; i < errCount ; i++ ) { + if (i > 0) + messages << "\n"; + messages << "[" << i << "] "; + pErr = conn->Errors->GetItem(i); + messages << "Error " << pErr->Number << ": " + << (LPCSTR)pErr->Description; + } + messages << std::ends; + return messages.str(); +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h new file mode 100644 index 0000000000..785d1587c5 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h @@ -0,0 +1,64 @@ +#ifndef QPID_STORE_MSSQL_DATABASECONNECTION_H +#define QPID_STORE_MSSQL_DATABASECONNECTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Bring in ADO 2.8 (yes, I know it says "15", but that's it...) +#import "C:\Program Files\Common Files\System\ado\msado15.dll" \ + no_namespace rename("EOF", "EndOfFile") + +#include <string> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class DatabaseConnection + * + * Represents a connection to the SQL database. This class wraps the + * needed _ConnectionPtr for ADO as well as the needed COM initialization + * and cleanup that each thread requires. It is expected that this class + * will be maintained in thread-specific storage so it has no locks. + */ +class DatabaseConnection { +protected: + _ConnectionPtr conn; + +public: + DatabaseConnection(); + ~DatabaseConnection(); + void open(const std::string& connectString, + const std::string& dbName = ""); + void close(); + operator _ConnectionPtr () { return conn; } + + void beginTransaction() { conn->BeginTrans(); } + void commitTransaction() {conn->CommitTrans(); } + void rollbackTransaction() { conn->RollbackTrans(); } + + std::string getErrors(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_DATABASECONNECTION_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/Exception.h b/qpid/cpp/src/qpid/store/ms-sql/Exception.h new file mode 100644 index 0000000000..65ec3388ff --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/Exception.h @@ -0,0 +1,66 @@ +#ifndef QPID_STORE_MSSQL_EXCEPTION_H +#define QPID_STORE_MSSQL_EXCEPTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <comdef.h> +#include <qpid/store/StorageProvider.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +class Exception : public qpid::store::StorageProvider::Exception +{ +protected: + std::string text; +public: + Exception(const std::string& _text) : text(_text) {} + virtual ~Exception() {} + virtual const char* what() const throw() { return text.c_str(); } +}; + +class ADOException : public Exception +{ +public: + ADOException(const std::string& _text, + _com_error &e, + const std::string& providerErrors = "") + : Exception(_text) { + text += ": "; + text += e.ErrorMessage(); + IErrorInfo *i = e.ErrorInfo(); + if (i != 0) { + text += ": "; + _bstr_t wmsg = e.Description(); + text += (const char *)wmsg; + i->Release(); + } + if (providerErrors.length() > 0) + text += providerErrors; + } +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_EXCEPTION_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp new file mode 100644 index 0000000000..1432cc8fca --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp @@ -0,0 +1,1286 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <stdlib.h> +#include <string> +#include <windows.h> +#include <qpid/broker/RecoverableQueue.h> +#include <qpid/log/Statement.h> +#include <qpid/store/MessageStorePlugin.h> +#include <qpid/store/StorageProvider.h> +#include "AmqpTransaction.h" +#include "BlobAdapter.h" +#include "BlobRecordset.h" +#include "BindingRecordset.h" +#include "MessageMapRecordset.h" +#include "MessageRecordset.h" +#include "TplRecordset.h" +#include "DatabaseConnection.h" +#include "Exception.h" +#include "State.h" +#include "VariantHelper.h" + +// Bring in ADO 2.8 (yes, I know it says "15", but that's it...) +#import "C:\Program Files\Common Files\System\ado\msado15.dll" \ + no_namespace rename("EOF", "EndOfFile") +#include <comdef.h> +namespace { +inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);}; + +// Table names +const std::string TblBinding("tblBinding"); +const std::string TblConfig("tblConfig"); +const std::string TblExchange("tblExchange"); +const std::string TblMessage("tblMessage"); +const std::string TblMessageMap("tblMessageMap"); +const std::string TblQueue("tblQueue"); +const std::string TblTpl("tblTPL"); +} + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class MSSqlProvider + * + * Implements a qpid::store::StorageProvider that uses Microsoft SQL Server as + * the backend data store for Qpid. + */ +class MSSqlProvider : public qpid::store::StorageProvider +{ +protected: + void finalizeMe(); + + void dump(); + +public: + MSSqlProvider(); + ~MSSqlProvider(); + + virtual qpid::Options* getOptions() { return &options; } + + virtual void earlyInitialize (Plugin::Target& target); + virtual void initialize(Plugin::Target& target); + + /** + * Receive notification that this provider is the one that will actively + * handle provider storage for the target. If the provider is to be used, + * this method will be called after earlyInitialize() and before any + * recovery operations (recovery, in turn, precedes call to initialize()). + */ + virtual void activate(MessageStorePlugin &store); + + /** + * @name Methods inherited from qpid::broker::MessageStore + */ + + /** + * Record the existence of a durable queue + */ + virtual void create(PersistableQueue& queue, + const qpid::framing::FieldTable& args); + /** + * Destroy a durable queue + */ + virtual void destroy(PersistableQueue& queue); + + /** + * Record the existence of a durable exchange + */ + virtual void create(const PersistableExchange& exchange, + const qpid::framing::FieldTable& args); + /** + * Destroy a durable exchange + */ + virtual void destroy(const PersistableExchange& exchange); + + /** + * Record a binding + */ + virtual void bind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args); + + /** + * Forget a binding + */ + virtual void unbind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args); + + /** + * Record generic durable configuration + */ + virtual void create(const PersistableConfig& config); + + /** + * Destroy generic durable configuration + */ + virtual void destroy(const PersistableConfig& config); + + /** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ + virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg); + + /** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ + virtual void destroy(PersistableMessage& msg); + + /** + * Appends content to a previously staged message + */ + virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, + const std::string& data); + + /** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ + virtual void loadContent(const qpid::broker::PersistableQueue& queue, + const boost::intrusive_ptr<const PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length); + + /** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param msg the message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void enqueue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); + + /** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param msg the message to dequeue + * @param queue the name of the queue from which it is to be dequeued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); + + /** + * Flushes all async messages to disk for the specified queue + * + * Note: this is a no-op for this provider. + * + * @param queue the name of the queue from which it is to be dequeued + */ + virtual void flush(const PersistableQueue& queue) {}; + + /** + * Returns the number of outstanding AIO's for a given queue + * + * If 0, than all the enqueue / dequeues have been stored + * to disk + * + * @param queue the name of the queue to check for outstanding AIO + */ + virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) + {return 0;} + //@} + + /** + * @name Methods inherited from qpid::broker::TransactionalStore + */ + //@{ + virtual std::auto_ptr<qpid::broker::TransactionContext> begin(); + virtual std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid); + virtual void prepare(qpid::broker::TPCTransactionContext& txn); + virtual void commit(qpid::broker::TransactionContext& txn); + virtual void abort(qpid::broker::TransactionContext& txn); + virtual void collectPreparedXids(std::set<std::string>& xids); + //@} + + virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer); + virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer, + ExchangeMap& exchangeMap); + virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer, + QueueMap& queueMap); + virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer, + const ExchangeMap& exchangeMap, + const QueueMap& queueMap); + virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, + MessageMap& messageMap, + MessageQueueMap& messageQueueMap); + virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap); + +private: + struct ProviderOptions : public qpid::Options + { + std::string connectString; + std::string catalogName; + + ProviderOptions(const std::string &name) + : qpid::Options(name), + catalogName("QpidStore") + { + const enum { NAMELEN = MAX_COMPUTERNAME_LENGTH + 1 }; + TCHAR myName[NAMELEN]; + DWORD myNameLen = NAMELEN; + GetComputerName(myName, &myNameLen); + connectString = "Data Source="; + connectString += myName; + connectString += "\\SQLEXPRESS;Integrated Security=SSPI"; + addOptions() + ("connect", + qpid::optValue(connectString, "STRING"), + "Connection string for the database to use. Will prepend " + "Provider=SQLOLEDB;") + ("catalog", + qpid::optValue(catalogName, "DB NAME"), + "Catalog (database) name") + ; + } + }; + ProviderOptions options; + + // Each thread has a separate connection to the database and also needs + // to manage its COM initialize/finalize individually. This is done by + // keeping a thread-specific State. + boost::thread_specific_ptr<State> dbState; + + State *initState(); + DatabaseConnection *initConnection(void); + void createDb(DatabaseConnection *db, const std::string &name); +}; + +static MSSqlProvider static_instance_registers_plugin; + +void +MSSqlProvider::finalizeMe() +{ + dbState.reset(); +} + +MSSqlProvider::MSSqlProvider() + : options("MS SQL Provider options") +{ +} + +MSSqlProvider::~MSSqlProvider() +{ +} + +void +MSSqlProvider::earlyInitialize(Plugin::Target &target) +{ + MessageStorePlugin *store = dynamic_cast<MessageStorePlugin *>(&target); + if (store) { + // If the database init fails, report it and don't register; give + // the rest of the broker a chance to run. + // + // Don't try to initConnection() since that will fail if the + // database doesn't exist. Instead, try to open a connection without + // a database name, then search for the database. There's still a + // chance this provider won't be selected for the store too, so be + // be sure to close the database connection before return to avoid + // leaving a connection up that will not be used. + try { + initState(); // This initializes COM + std::auto_ptr<DatabaseConnection> db(new DatabaseConnection()); + db->open(options.connectString, ""); + _ConnectionPtr conn(*db); + _RecordsetPtr pCatalogs = NULL; + VariantHelper<std::string> catalogName(options.catalogName); + pCatalogs = conn->OpenSchema(adSchemaCatalogs, catalogName); + if (pCatalogs->EndOfFile) { + // Database doesn't exist; create it + QPID_LOG(notice, + "MSSQL: Creating database " + options.catalogName); + createDb(db.get(), options.catalogName); + } + else { + QPID_LOG(notice, + "MSSQL: Database located: " + options.catalogName); + } + if (pCatalogs) { + if (pCatalogs->State == adStateOpen) + pCatalogs->Close(); + pCatalogs = 0; + } + db->close(); + store->providerAvailable("MSSQL", this); + } + catch (qpid::Exception &e) { + QPID_LOG(error, e.what()); + return; + } + store->addFinalizer(boost::bind(&MSSqlProvider::finalizeMe, this)); + } +} + +void +MSSqlProvider::initialize(Plugin::Target& target) +{ +} + +void +MSSqlProvider::activate(MessageStorePlugin &store) +{ + QPID_LOG(info, "MS SQL Provider is up"); +} + +void +MSSqlProvider::create(PersistableQueue& queue, + const qpid::framing::FieldTable& /*args needed for jrnl*/) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; + try { + db->beginTransaction(); + rsQueues.open(db, TblQueue); + rsQueues.add(queue); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error creating queue " + queue.getName(), e, errs); + } +} + +/** + * Destroy a durable queue + */ +void +MSSqlProvider::destroy(PersistableQueue& queue) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; + BindingRecordset rsBindings; + MessageRecordset rsMessages; + MessageMapRecordset rsMessageMaps; + try { + db->beginTransaction(); + rsQueues.open(db, TblQueue); + rsBindings.open(db, TblBinding); + rsMessages.open(db, TblMessage); + rsMessageMaps.open(db, TblMessageMap); + // Remove bindings first; the queue IDs can't be ripped out from + // under the references in the bindings table. Then remove the + // message->queue entries for the queue, also because the queue can't + // be deleted while there are references to it. If there are messages + // orphaned by removing the queue references, they're deleted by + // a trigger on the tblMessageMap table. Lastly, the queue record + // can be removed. + rsBindings.removeForQueue(queue.getPersistenceId()); + rsMessageMaps.removeForQueue(queue.getPersistenceId()); + rsQueues.remove(queue); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error deleting queue " + queue.getName(), e, errs); + } +} + +/** + * Record the existence of a durable exchange + */ +void +MSSqlProvider::create(const PersistableExchange& exchange, + const qpid::framing::FieldTable& args) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsExchanges; + try { + db->beginTransaction(); + rsExchanges.open(db, TblExchange); + rsExchanges.add(exchange); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error creating exchange " + exchange.getName(), + e, + errs); + } +} + +/** + * Destroy a durable exchange + */ +void +MSSqlProvider::destroy(const PersistableExchange& exchange) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsExchanges; + BindingRecordset rsBindings; + try { + db->beginTransaction(); + rsExchanges.open(db, TblExchange); + rsBindings.open(db, TblBinding); + // Remove bindings first; the exchange IDs can't be ripped out from + // under the references in the bindings table. + rsBindings.removeForExchange(exchange.getPersistenceId()); + rsExchanges.remove(exchange); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error deleting exchange " + exchange.getName(), + e, + errs); + } +} + +/** + * Record a binding + */ +void +MSSqlProvider::bind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args) +{ + DatabaseConnection *db = initConnection(); + BindingRecordset rsBindings; + try { + db->beginTransaction(); + rsBindings.open(db, TblBinding); + rsBindings.add(exchange.getPersistenceId(), + queue.getPersistenceId(), + key, + args); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error binding exchange " + exchange.getName() + + " to queue " + queue.getName(), + e, + errs); + } +} + +/** + * Forget a binding + */ +void +MSSqlProvider::unbind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args) +{ + DatabaseConnection *db = initConnection(); + BindingRecordset rsBindings; + try { + db->beginTransaction(); + rsBindings.open(db, TblBinding); + rsBindings.remove(exchange.getPersistenceId(), + queue.getPersistenceId(), + key, + args); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error unbinding exchange " + exchange.getName() + + " from queue " + queue.getName(), + e, + errs); + } +} + +/** + * Record generic durable configuration + */ +void +MSSqlProvider::create(const PersistableConfig& config) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsConfigs; + try { + db->beginTransaction(); + rsConfigs.open(db, TblConfig); + rsConfigs.add(config); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error creating config " + config.getName(), e, errs); + } +} + +/** + * Destroy generic durable configuration + */ +void +MSSqlProvider::destroy(const PersistableConfig& config) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsConfigs; + try { + db->beginTransaction(); + rsConfigs.open(db, TblConfig); + rsConfigs.remove(config); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error deleting config " + config.getName(), e, errs); + } +} + +/** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ +void +MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg) +{ + DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; + try { + db->beginTransaction(); + rsMessages.open(db, TblMessage); + rsMessages.add(msg); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error staging message", e, errs); + } +} + +/** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ +void +MSSqlProvider::destroy(PersistableMessage& msg) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsMessages; + try { + db->beginTransaction(); + rsMessages.open(db, TblMessage); + rsMessages.remove(msg); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error deleting message", e, errs); + } +} + +/** + * Appends content to a previously staged message + */ +void +MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, + const std::string& data) +{ + DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; + try { + db->beginTransaction(); + rsMessages.open(db, TblMessage); + rsMessages.append(msg, data); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error appending to message", e, errs); + } +} + +/** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ +void +MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/, + const boost::intrusive_ptr<const PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length) +{ + // SQL store keeps all messages in one table, so we don't need the + // queue reference. + DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; + try { + rsMessages.open(db, TblMessage); + rsMessages.loadContent(msg, data, offset, length); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + throw ADOException("Error loading message content", e, errs); + } +} + +/** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * @param ctxt The transaction context under which this enqueue happens. + * @param msg The message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + */ +void +MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) +{ + // If this enqueue is in the context of a transaction, use the specified + // transaction to nest a new transaction for this operation. However, if + // this is not in the context of a transaction, then just use the thread's + // DatabaseConnection with a ADO transaction. + DatabaseConnection *db = 0; + std::string xid; + AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); + if (atxn == 0) { + db = initConnection(); + db->beginTransaction(); + } + else { + (void)initState(); // Ensure this thread is initialized + // It's a transactional enqueue; if it's TPC, grab the xid. + AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt); + if (tpcTxn) + xid = tpcTxn->getXid(); + db = atxn->dbConn(); + try { + atxn->sqlBegin(); + } + catch(_com_error &e) { + throw ADOException("Error queuing message", e, db->getErrors()); + } + } + + MessageRecordset rsMessages; + MessageMapRecordset rsMap; + try { + if (msg->getPersistenceId() == 0) { // Message itself not yet saved + rsMessages.open(db, TblMessage); + rsMessages.add(msg); + } + rsMap.open(db, TblMessageMap); + rsMap.add(msg->getPersistenceId(), queue.getPersistenceId(), xid); + if (atxn) + atxn->sqlCommit(); + else + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + if (atxn) + atxn->sqlAbort(); + else + db->rollbackTransaction(); + throw ADOException("Error queuing message", e, errs); + } + msg->enqueueComplete(); +} + +/** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * @param ctxt The transaction context under which this dequeue happens. + * @param msg The message to dequeue + * @param queue The queue from which it is to be dequeued + */ +void +MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) +{ + // If this dequeue is in the context of a transaction, use the specified + // transaction to nest a new transaction for this operation. However, if + // this is not in the context of a transaction, then just use the thread's + // DatabaseConnection with a ADO transaction. + DatabaseConnection *db = 0; + std::string xid; + AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); + if (atxn == 0) { + db = initConnection(); + db->beginTransaction(); + } + else { + (void)initState(); // Ensure this thread is initialized + // It's a transactional dequeue; if it's TPC, grab the xid. + AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt); + if (tpcTxn) + xid = tpcTxn->getXid(); + db = atxn->dbConn(); + try { + atxn->sqlBegin(); + } + catch(_com_error &e) { + throw ADOException("Error queuing message", e, db->getErrors()); + } + } + + MessageMapRecordset rsMap; + try { + rsMap.open(db, TblMessageMap); + // TPC dequeues are just marked pending and will actually be removed + // when the transaction commits; Single-phase dequeues are removed + // now, relying on the SQL transaction to put it back if the + // transaction doesn't commit. + if (!xid.empty()) { + rsMap.pendingRemove(msg->getPersistenceId(), + queue.getPersistenceId(), + xid); + } + else { + rsMap.remove(msg->getPersistenceId(), + queue.getPersistenceId()); + } + if (atxn) + atxn->sqlCommit(); + else + db->commitTransaction(); + } + catch(ms_sql::Exception&) { + if (atxn) + atxn->sqlAbort(); + else + db->rollbackTransaction(); + throw; + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + if (atxn) + atxn->sqlAbort(); + else + db->rollbackTransaction(); + throw ADOException("Error dequeuing message", e, errs); + } + msg->dequeueComplete(); +} + +std::auto_ptr<qpid::broker::TransactionContext> +MSSqlProvider::begin() +{ + (void)initState(); // Ensure this thread is initialized + + // Transactions are associated with the Connection, so this transaction + // context needs its own connection. At the time of writing, single-phase + // transactions are dealt with completely on one thread, so we really + // could just use the thread-specific DatabaseConnection for this. + // However, that would introduce an ugly, hidden coupling, so play + // it safe and handle this just like a TPC transaction, which actually + // can be prepared and committed/aborted from different threads, + // making it a bad idea to try using the thread-local DatabaseConnection. + boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection); + db->open(options.connectString, options.catalogName); + std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db)); + tx->sqlBegin(); + std::auto_ptr<qpid::broker::TransactionContext> tc(tx); + return tc; +} + +std::auto_ptr<qpid::broker::TPCTransactionContext> +MSSqlProvider::begin(const std::string& xid) +{ + (void)initState(); // Ensure this thread is initialized + boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection); + db->open(options.connectString, options.catalogName); + std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid)); + tx->sqlBegin(); + + TplRecordset rsTpl; + try { + tx->sqlBegin(); + rsTpl.open(db.get(), TblTpl); + rsTpl.add(xid); + tx->sqlCommit(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + tx->sqlAbort(); + throw ADOException("Error adding TPL record", e, errs); + } + + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx); + return tc; +} + +void +MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn) +{ + // Commit all the marked-up enqueue/dequeue ops and the TPL record. + // On commit/rollback the TPL will be removed and the TPL markups + // on the message map will be cleaned up as well. + (void)initState(); // Ensure this thread is initialized + AmqpTPCTransaction *atxn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (atxn == 0) + throw qpid::broker::InvalidTransactionContextException(); + try { + atxn->sqlCommit(); + } + catch(_com_error &e) { + throw ADOException("Error preparing", e, atxn->dbConn()->getErrors()); + } + atxn->setPrepared(); +} + +void +MSSqlProvider::commit(qpid::broker::TransactionContext& txn) +{ + (void)initState(); // Ensure this thread is initialized + /* + * One-phase transactions simply commit the outer SQL transaction + * that was begun on begin(). Two-phase transactions are different - + * the SQL transaction started on begin() was committed on prepare() + * so all the SQL records reflecting the enqueue/dequeue actions for + * the transaction are recorded but with xid markups on them to reflect + * that they are prepared but not committed. Now go back and remove + * the markups, deleting those marked for removal. + */ + AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (p2txn == 0) { + AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn); + if (p1txn == 0) + throw qpid::broker::InvalidTransactionContextException(); + p1txn->sqlCommit(); + return; + } + + DatabaseConnection *db(p2txn->dbConn()); + TplRecordset rsTpl; + MessageMapRecordset rsMessageMap; + try { + db->beginTransaction(); + rsTpl.open(db, TblTpl); + rsMessageMap.open(db, TblMessageMap); + rsMessageMap.commitPrepared(p2txn->getXid()); + rsTpl.remove(p2txn->getXid()); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error committing transaction", e, errs); + } +} + +void +MSSqlProvider::abort(qpid::broker::TransactionContext& txn) +{ + (void)initState(); // Ensure this thread is initialized + /* + * One-phase and non-prepared two-phase transactions simply abort + * the outer SQL transaction that was begun on begin(). However, prepared + * two-phase transactions are different - the SQL transaction started + * on begin() was committed on prepare() so all the SQL records + * reflecting the enqueue/dequeue actions for the transaction are + * recorded but with xid markups on them to reflect that they are + * prepared but not committed. Now go back and remove the markups, + * deleting those marked for addition. + */ + AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (p2txn == 0 || !p2txn->isPrepared()) { + AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn); + if (p1txn == 0) + throw qpid::broker::InvalidTransactionContextException(); + p1txn->sqlAbort(); + return; + } + + DatabaseConnection *db(p2txn->dbConn()); + TplRecordset rsTpl; + MessageMapRecordset rsMessageMap; + try { + db->beginTransaction(); + rsTpl.open(db, TblTpl); + rsMessageMap.open(db, TblMessageMap); + rsMessageMap.abortPrepared(p2txn->getXid()); + rsTpl.remove(p2txn->getXid()); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error committing transaction", e, errs); + } + + + (void)initState(); // Ensure this thread is initialized + AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn); + if (atxn == 0) + throw qpid::broker::InvalidTransactionContextException(); + atxn->sqlAbort(); +} + +void +MSSqlProvider::collectPreparedXids(std::set<std::string>& xids) +{ + DatabaseConnection *db = initConnection(); + try { + TplRecordset rsTpl; + rsTpl.open(db, TblTpl); + rsTpl.recover(xids); + } + catch(_com_error &e) { + throw ADOException("Error reading TPL", e, db->getErrors()); + } +} + +// @TODO Much of this recovery code is way too similar... refactor to +// a recover template method on BlobRecordset. + +void +MSSqlProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer) +{ + DatabaseConnection *db = 0; + try { + db = initConnection(); + BlobRecordset rsConfigs; + rsConfigs.open(db, TblConfig); + _RecordsetPtr p = (_RecordsetPtr)rsConfigs; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Config instance and reset its ID. + broker::RecoverableConfig::shared_ptr config = + recoverer.recoverConfig(blob); + config->setPersistenceId(id); + p->MoveNext(); + } + } + catch(_com_error &e) { + throw ADOException("Error recovering configs", + e, + db ? db->getErrors() : ""); + } +} + +void +MSSqlProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer, + ExchangeMap& exchangeMap) +{ + DatabaseConnection *db = 0; + try { + db = initConnection(); + BlobRecordset rsExchanges; + rsExchanges.open(db, TblExchange); + _RecordsetPtr p = (_RecordsetPtr)rsExchanges; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Exchange instance, reset its ID, and remember the + // ones restored for matching up when recovering bindings. + broker::RecoverableExchange::shared_ptr exchange = + recoverer.recoverExchange(blob); + exchange->setPersistenceId(id); + exchangeMap[id] = exchange; + p->MoveNext(); + } + } + catch(_com_error &e) { + throw ADOException("Error recovering exchanges", + e, + db ? db->getErrors() : ""); + } +} + +void +MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer, + QueueMap& queueMap) +{ + DatabaseConnection *db = 0; + try { + db = initConnection(); + BlobRecordset rsQueues; + rsQueues.open(db, TblQueue); + _RecordsetPtr p = (_RecordsetPtr)rsQueues; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Queue instance and reset its ID. + broker::RecoverableQueue::shared_ptr queue = + recoverer.recoverQueue(blob); + queue->setPersistenceId(id); + queueMap[id] = queue; + p->MoveNext(); + } + } + catch(_com_error &e) { + throw ADOException("Error recovering queues", + e, + db ? db->getErrors() : ""); + } +} + +void +MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer, + const ExchangeMap& exchangeMap, + const QueueMap& queueMap) +{ + DatabaseConnection *db = 0; + try { + db = initConnection(); + BindingRecordset rsBindings; + rsBindings.open(db, TblBinding); + rsBindings.recover(recoverer, exchangeMap, queueMap); + } + catch(_com_error &e) { + throw ADOException("Error recovering bindings", + e, + db ? db->getErrors() : ""); + } +} + +void +MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer, + MessageMap& messageMap, + MessageQueueMap& messageQueueMap) +{ + DatabaseConnection *db = 0; + try { + db = initConnection(); + MessageRecordset rsMessages; + rsMessages.open(db, TblMessage); + rsMessages.recover(recoverer, messageMap); + + MessageMapRecordset rsMessageMaps; + rsMessageMaps.open(db, TblMessageMap); + rsMessageMaps.recover(messageQueueMap); + } + catch(_com_error &e) { + throw ADOException("Error recovering messages", + e, + db ? db->getErrors() : ""); + } +} + +void +MSSqlProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap) +{ + DatabaseConnection *db = initConnection(); + std::set<std::string> xids; + try { + TplRecordset rsTpl; + rsTpl.open(db, TblTpl); + rsTpl.recover(xids); + } + catch(_com_error &e) { + throw ADOException("Error recovering TPL records", e, db->getErrors()); + } + + try { + // Rebuild the needed RecoverableTransactions. + for (std::set<std::string>::const_iterator iXid = xids.begin(); + iXid != xids.end(); + ++iXid) { + boost::shared_ptr<DatabaseConnection> dbX(new DatabaseConnection); + dbX->open(options.connectString, options.catalogName); + std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(dbX, + *iXid)); + tx->setPrepared(); + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx); + dtxMap[*iXid] = recoverer.recoverTransaction(*iXid, tc); + } + } + catch(_com_error &e) { + throw ADOException("Error recreating dtx connection", e); + } +} + +////////////// Internal Methods + +State * +MSSqlProvider::initState() +{ + State *state = dbState.get(); // See if thread has initialized + if (!state) { + state = new State; + dbState.reset(state); + } + return state; +} + +DatabaseConnection * +MSSqlProvider::initConnection(void) +{ + State *state = initState(); + if (state->dbConn != 0) + return state->dbConn; // And the DatabaseConnection is set up too + std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + db->open(options.connectString, options.catalogName); + state->dbConn = db.release(); + return state->dbConn; +} + +void +MSSqlProvider::createDb(DatabaseConnection *db, const std::string &name) +{ + const std::string dbCmd = "CREATE DATABASE " + name; + const std::string useCmd = "USE " + name; + const std::string tableCmd = "CREATE TABLE "; + const std::string colSpecs = + " (persistenceId bigint PRIMARY KEY NOT NULL IDENTITY(1,1)," + " fieldTableBlob varbinary(MAX) NOT NULL)"; + const std::string bindingSpecs = + " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL," + " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL," + " routingKey varchar(255)," + " fieldTableBlob varbinary(MAX))"; + const std::string messageMapSpecs = + " (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL," + " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL," + " prepareStatus tinyint CHECK (prepareStatus IS NULL OR " + " prepareStatus IN (1, 2))," + " xid varbinary(512) REFERENCES tblTPL(xid)" + " CONSTRAINT CK_NoDups UNIQUE NONCLUSTERED (messageId, queueId) )"; + const std::string tplSpecs = " (xid varbinary(512) PRIMARY KEY NOT NULL)"; + // SET NOCOUNT ON added to prevent extra result sets from + // interfering with SELECT statements. (Added by SQL Management) + const std::string removeUnrefMsgsTrigger = + "CREATE TRIGGER dbo.RemoveUnreferencedMessages " + "ON tblMessageMap AFTER DELETE AS BEGIN " + "SET NOCOUNT ON; " + "DELETE FROM tblMessage " + "WHERE tblMessage.persistenceId IN " + " (SELECT messageId FROM deleted) AND" + " NOT EXISTS(SELECT * FROM tblMessageMap" + " WHERE tblMessageMap.messageId IN" + " (SELECT messageId FROM deleted)) " + "END"; + + _variant_t unused; + _bstr_t dbStr = dbCmd.c_str(); + _ConnectionPtr conn(*db); + try { + conn->Execute(dbStr, &unused, adExecuteNoRecords); + _bstr_t useStr = useCmd.c_str(); + conn->Execute(useStr, &unused, adExecuteNoRecords); + std::string makeTable = tableCmd + TblQueue + colSpecs; + _bstr_t makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblExchange + colSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblConfig + colSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblMessage + colSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblBinding + bindingSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblTpl + tplSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblMessageMap + messageMapSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + _bstr_t addTriggerStr = removeUnrefMsgsTrigger.c_str(); + conn->Execute(addTriggerStr, &unused, adExecuteNoRecords); + } + catch(_com_error &e) { + throw ADOException("MSSQL can't create " + name, e, db->getErrors()); + } +} + +void +MSSqlProvider::dump() +{ + // dump all db records to qpid_log + QPID_LOG(notice, "DB Dump: (not dumping anything)"); + // rsQueues.dump(); +} + + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp new file mode 100644 index 0000000000..ce9fa61010 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp @@ -0,0 +1,267 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Exception.h> +#include <qpid/log/Statement.h> +#include <qpid/store/StorageProvider.h> + +#include "MessageMapRecordset.h" +#include "BlobEncoder.h" +#include "DatabaseConnection.h" +#include "Exception.h" +#include "VariantHelper.h" + +namespace { +inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);}; +} + +namespace qpid { +namespace store { +namespace ms_sql { + +void +MessageMapRecordset::open(DatabaseConnection* conn, const std::string& table) +{ + init(conn, table); +} + +void +MessageMapRecordset::add(uint64_t messageId, + uint64_t queueId, + const std::string& xid) +{ + std::ostringstream command; + command << "INSERT INTO " << tableName + << " (messageId, queueId"; + if (!xid.empty()) + command << ", prepareStatus, xid"; + command << ") VALUES (" << messageId << "," << queueId; + if (!xid.empty()) + command << "," << PREPARE_ADD << ",?"; + command << ")" << std::ends; + + _CommandPtr cmd = NULL; + _ParameterPtr xidVal = NULL; + TESTHR(cmd.CreateInstance(__uuidof(Command))); + _ConnectionPtr p = *dbConn; + cmd->ActiveConnection = p; + cmd->CommandText = command.str().c_str(); + cmd->CommandType = adCmdText; + if (!xid.empty()) { + TESTHR(xidVal.CreateInstance(__uuidof(Parameter))); + xidVal->Name = "@xid"; + xidVal->Type = adVarBinary; + xidVal->Size = xid.length(); + xidVal->Direction = adParamInput; + xidVal->Value = BlobEncoder(xid); + cmd->Parameters->Append(xidVal); + } + cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords); +} + +void +MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId) +{ + std::ostringstream command; + command << "DELETE FROM " << tableName + << " WHERE queueId = " << queueId + << " AND messageId = " << messageId << std::ends; + _CommandPtr cmd = NULL; + TESTHR(cmd.CreateInstance(__uuidof(Command))); + _ConnectionPtr p = *dbConn; + cmd->ActiveConnection = p; + cmd->CommandText = command.str().c_str(); + cmd->CommandType = adCmdText; + _variant_t deletedRecords; + cmd->Execute(&deletedRecords, NULL, adCmdText | adExecuteNoRecords); + if ((long)deletedRecords == 0) + throw ms_sql::Exception("Message does not exist in queue mapping"); + // Trigger on deleting the mapping takes care of deleting orphaned + // message record from tblMessage. +} + +void +MessageMapRecordset::pendingRemove(uint64_t messageId, + uint64_t queueId, + const std::string& xid) +{ + // Look up the mapping for the specified message and queue. There + // should be only one because of the uniqueness constraint in the + // SQL table. Update it to reflect it's pending delete with + // the specified xid. + std::ostringstream command; + command << "UPDATE " << tableName + << " SET prepareStatus=" << PREPARE_REMOVE + << " , xid=?" + << " WHERE queueId = " << queueId + << " AND messageId = " << messageId << std::ends; + + _CommandPtr cmd = NULL; + _ParameterPtr xidVal = NULL; + TESTHR(cmd.CreateInstance(__uuidof(Command))); + TESTHR(xidVal.CreateInstance(__uuidof(Parameter))); + _ConnectionPtr p = *dbConn; + cmd->ActiveConnection = p; + cmd->CommandText = command.str().c_str(); + cmd->CommandType = adCmdText; + xidVal->Name = "@xid"; + xidVal->Type = adVarBinary; + xidVal->Size = xid.length(); + xidVal->Direction = adParamInput; + xidVal->Value = BlobEncoder(xid); + cmd->Parameters->Append(xidVal); + cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords); +} + +void +MessageMapRecordset::removeForQueue(uint64_t queueId) +{ + std::ostringstream command; + command << "DELETE FROM " << tableName + << " WHERE queueId = " << queueId << std::ends; + _CommandPtr cmd = NULL; + + TESTHR(cmd.CreateInstance(__uuidof(Command))); + _ConnectionPtr p = *dbConn; + cmd->ActiveConnection = p; + cmd->CommandText = command.str().c_str(); + cmd->CommandType = adCmdText; + cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords); +} + +void +MessageMapRecordset::commitPrepared(const std::string& xid) +{ + // Find all the records for the specified xid. Records marked as adding + // are now permanent so remove the xid and prepareStatus. Records marked + // as removing are removed entirely. + openRs(); + MessageMap m; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&m); + for (; !rs->EndOfFile; rs->MoveNext()) { + if (m.xidStatus != adFldOK) + continue; + const std::string x(m.xid, m.xidLength); + if (x != xid) + continue; + if (m.prepareStatus == PREPARE_REMOVE) { + rs->Delete(adAffectCurrent); + } + else { + _variant_t dbNull; + dbNull.ChangeType(VT_NULL); + rs->Fields->GetItem("prepareStatus")->Value = dbNull; + rs->Fields->GetItem("xid")->Value = dbNull; + } + rs->Update(); + } + piAdoRecordBinding->Release(); +} + +void +MessageMapRecordset::abortPrepared(const std::string& xid) +{ + // Find all the records for the specified xid. Records marked as adding + // need to be removed while records marked as removing are put back to + // no xid and no prepareStatus. + openRs(); + MessageMap m; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&m); + for (; !rs->EndOfFile; rs->MoveNext()) { + if (m.xidStatus != adFldOK) + continue; + const std::string x(m.xid, m.xidLength); + if (x != xid) + continue; + if (m.prepareStatus == PREPARE_ADD) { + rs->Delete(adAffectCurrent); + } + else { + _variant_t dbNull; + dbNull.ChangeType(VT_NULL); + rs->Fields->GetItem("prepareStatus")->Value = dbNull; + rs->Fields->GetItem("xid")->Value = dbNull; + } + rs->Update(); + } + piAdoRecordBinding->Release(); +} + +void +MessageMapRecordset::recover(MessageQueueMap& msgMap) +{ + openRs(); + if (rs->BOF && rs->EndOfFile) + return; // Nothing to do + rs->MoveFirst(); + MessageMap b; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&b); + while (!rs->EndOfFile) { + qpid::store::QueueEntry entry(b.queueId); + if (b.xidStatus == adFldOK && b.xidLength > 0) { + entry.xid.assign(b.xid, b.xidLength); + entry.tplStatus = + b.prepareStatus == PREPARE_ADD ? QueueEntry::ADDING + : QueueEntry::REMOVING; + } + else { + entry.tplStatus = QueueEntry::NONE; + } + msgMap[b.messageId].push_back(entry); + rs->MoveNext(); + } + + piAdoRecordBinding->Release(); +} + +void +MessageMapRecordset::dump() +{ + openRs(); + Recordset::dump(); + if (rs->EndOfFile && rs->BOF) // No records + return; + rs->MoveFirst(); + + MessageMap m; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&m); + + while (!rs->EndOfFile) { + QPID_LOG(notice, "msg " << m.messageId << " on queue " << m.queueId); + rs->MoveNext(); + } + + piAdoRecordBinding->Release(); +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h new file mode 100644 index 0000000000..1b0c2f073e --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h @@ -0,0 +1,100 @@ +#ifndef QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H +#define QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <icrsint.h> +#include "Recordset.h" +#include <qpid/broker/RecoveryManager.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class MessageMapRecordset + * + * Class for the message map (message -> queue) records. + */ +class MessageMapRecordset : public Recordset { + + // These values are defined in a constraint on the tblMessageMap table. + // the prepareStatus column can only be null, 1, or 2. + enum { PREPARE_ADD=1, PREPARE_REMOVE=2 }; + + class MessageMap : public CADORecordBinding { + BEGIN_ADO_BINDING(MessageMap) + ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE) + ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE) + ADO_FIXED_LENGTH_ENTRY2(3, adTinyInt, prepareStatus, FALSE) + ADO_VARIABLE_LENGTH_ENTRY(4, adVarBinary, xid, sizeof(xid), + xidStatus, xidLength, FALSE) + END_ADO_BINDING() + + public: + uint64_t messageId; + uint64_t queueId; + uint8_t prepareStatus; + char xid[512]; + int xidStatus; + uint32_t xidLength; + }; + + void selectOnXid(const std::string& xid); + +public: + virtual void open(DatabaseConnection* conn, const std::string& table); + + // Add a new mapping + void add(uint64_t messageId, + uint64_t queueId, + const std::string& xid = ""); + + // Remove a specific mapping. + void remove(uint64_t messageId, uint64_t queueId); + + // Mark the indicated message->queue entry pending removal. The entry + // for the mapping is updated to indicate pending removal with the + // specified xid. + void pendingRemove(uint64_t messageId, + uint64_t queueId, + const std::string& xid); + + // Remove mappings for all messages on a specified queue. + void removeForQueue(uint64_t queueId); + + // Commit records recorded as prepared. + void commitPrepared(const std::string& xid); + + // Abort prepared changes. + void abortPrepared(const std::string& xid); + + // Recover the mappings of message ID -> vector<queue ID>. + void recover(MessageQueueMap& msgMap); + + // Dump table contents; useful for debugging. + void dump(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp new file mode 100644 index 0000000000..495f1a08c2 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp @@ -0,0 +1,184 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Exception.h> +#include <qpid/log/Statement.h> + +#include "MessageRecordset.h" +#include "BlobAdapter.h" +#include "BlobEncoder.h" +#include "VariantHelper.h" + +#include <boost/intrusive_ptr.hpp> + +class qpid::broker::PersistableMessage; + +namespace qpid { +namespace store { +namespace ms_sql { + +void +MessageRecordset::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg) +{ + BlobEncoder blob (msg); // Marshall headers and content to a blob + rs->AddNew(); + rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob); + rs->Update(); + uint64_t id = rs->Fields->Item["persistenceId"]->Value; + msg->setPersistenceId(id); +} + +void +MessageRecordset::append(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, + const std::string& data) +{ + // Look up the message by its Id + std::ostringstream filter; + filter << "persistenceId = " << msg->getPersistenceId() << std::ends; + rs->PutFilter (VariantHelper<std::string>(filter.str())); + if (rs->RecordCount == 0) { + throw Exception("Can't append to message not stored in database"); + } + BlobEncoder blob (data); // Marshall string data to a blob + rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob); + rs->Update(); +} + +void +MessageRecordset::remove(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg) +{ + BlobRecordset::remove(msg->getPersistenceId()); +} + +void +MessageRecordset::loadContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length) +{ + // Look up the message by its Id + std::ostringstream filter; + filter << "persistenceId = " << msg->getPersistenceId() << std::ends; + rs->PutFilter (VariantHelper<std::string>(filter.str())); + if (rs->RecordCount == 0) { + throw Exception("Can't load message not stored in database"); + } + + // NOTE! If this code needs to change, please verify the encoding + // code in BlobEncoder. + long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize; + uint32_t headerSize; + const size_t headerFieldLength = sizeof(headerSize); + BlobAdapter blob(headerFieldLength); + blob = + rs->Fields->Item["fieldTableBlob"]->GetChunk((long)headerFieldLength); + headerSize = ((qpid::framing::Buffer&)blob).getLong(); + + // GetChunk always begins reading where the previous GetChunk left off, + // so we can't just tell it to ignore the header and read the data. + // So, read the header plus the offset, plus the desired data, then + // copy the desired data to the supplied string. If this ends up asking + // for more than is available in the field, reduce it to what's there. + long getSize = headerSize + offset + length; + if (getSize + (long)headerFieldLength > blobSize) { + size_t reduce = (getSize + headerFieldLength) - blobSize; + getSize -= reduce; + length -= reduce; + } + BlobAdapter header_plus(getSize); + header_plus = rs->Fields->Item["fieldTableBlob"]->GetChunk(getSize); + uint8_t *throw_away = new uint8_t[headerSize + offset]; + ((qpid::framing::Buffer&)header_plus).getRawData(throw_away, headerSize + offset); + delete throw_away; + ((qpid::framing::Buffer&)header_plus).getRawData(data, length); +} + +void +MessageRecordset::recover(qpid::broker::RecoveryManager& recoverer, + std::map<uint64_t, broker::RecoverableMessage::shared_ptr>& messageMap) +{ + if (rs->BOF && rs->EndOfFile) + return; // Nothing to do + rs->MoveFirst(); + Binding b; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&b); + while (!rs->EndOfFile) { + // The blob was written as normal, but with the header length + // prepended in a uint32_t. Due to message staging threshold + // limits, the header may be all that's read in; get it first, + // recover that message header, then see if the rest is needed. + // + // NOTE! If this code needs to change, please verify the encoding + // code in BlobEncoder. + long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize; + uint32_t headerSize; + const size_t headerFieldLength = sizeof(headerSize); + BlobAdapter blob(headerFieldLength); + blob = + rs->Fields->Item["fieldTableBlob"]->GetChunk((long)headerFieldLength); + headerSize = ((qpid::framing::Buffer&)blob).getLong(); + BlobAdapter header(headerSize); + header = rs->Fields->Item["fieldTableBlob"]->GetChunk(headerSize); + broker::RecoverableMessage::shared_ptr msg; + msg = recoverer.recoverMessage(header); + msg->setPersistenceId(b.messageId); + messageMap[b.messageId] = msg; + + // Now, do we need the rest of the content? + long contentLength = blobSize - headerFieldLength - headerSize; + if (contentLength > 0 && msg->loadContent(contentLength)) { + BlobAdapter content(contentLength); + content = + rs->Fields->Item["fieldTableBlob"]->GetChunk(contentLength); + msg->decodeContent(content); + } + rs->MoveNext(); + } + + piAdoRecordBinding->Release(); +} + +void +MessageRecordset::dump() +{ + Recordset::dump(); + if (rs->EndOfFile && rs->BOF) // No records + return; + rs->MoveFirst(); + + Binding b; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&b); + + while (VARIANT_FALSE == rs->EndOfFile) { + QPID_LOG(notice, "Msg " << b.messageId); + rs->MoveNext(); + } + + piAdoRecordBinding->Release(); +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h new file mode 100644 index 0000000000..698b2561fe --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h @@ -0,0 +1,85 @@ +#ifndef QPID_STORE_MSSQL_MESSAGERECORDSET_H +#define QPID_STORE_MSSQL_MESSAGERECORDSET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <icrsint.h> +#include "BlobRecordset.h" +#include <qpid/broker/PersistableMessage.h> +#include <qpid/broker/RecoveryManager.h> +#include <boost/intrusive_ptr.hpp> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class MessageRecordset + * + * Class for storing and recovering messages. Messages are primarily blobs + * and handled similarly. However, messages larger than the staging threshold + * are not contained completely in memory; they're left mostly in the store + * and the header is held in memory. So when the message "blob" is saved, + * an additional size-of-the-header field is prepended to the blob. + * On recovery, the size-of-the-header is used to get only what's needed + * until it's determined if the entire message is to be recovered to memory. + */ +class MessageRecordset : public BlobRecordset { + class Binding : public CADORecordBinding { + BEGIN_ADO_BINDING(Binding) + ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE) + END_ADO_BINDING() + + public: + uint64_t messageId; + }; + +public: + // Store a message. Store the header size (4 bytes) then the regular + // blob comprising the message. + void add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg); + + // Append additional content to an existing message. + void append(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, + const std::string& data); + + // Remove an existing message + void remove(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg); + + // Load all or part of a stored message. This skips the header parts and + // loads content. + void loadContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length); + + // Recover messages and save a map of those recovered. + void recover(qpid::broker::RecoveryManager& recoverer, + std::map<uint64_t, broker::RecoverableMessage::shared_ptr>& messageMap); + + // Dump table contents; useful for debugging. + void dump(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_MESSAGERECORDSET_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp new file mode 100644 index 0000000000..e706799951 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Exception.h> +#include <qpid/log/Statement.h> + +#include "Recordset.h" +#include "BlobEncoder.h" +#include "DatabaseConnection.h" +#include "VariantHelper.h" + +namespace { +inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);}; +} + +namespace qpid { +namespace store { +namespace ms_sql { + + +void +Recordset::init(DatabaseConnection* conn, const std::string& table) +{ + dbConn = conn; + TESTHR(rs.CreateInstance(__uuidof(::Recordset))); + tableName = table; +} + +void +Recordset::openRs() +{ + // Client-side cursors needed to get access to newly added + // identity column immediately. Recordsets need this to get the + // persistence ID for the broker objects. + rs->CursorLocation = adUseClient; + _ConnectionPtr p = *dbConn; + rs->Open(tableName.c_str(), + _variant_t((IDispatch *)p, true), + adOpenStatic, + adLockOptimistic, + adCmdTable); +} + +void +Recordset::open(DatabaseConnection* conn, const std::string& table) +{ + init(conn, table); + openRs(); +} + +void +Recordset::close() +{ + if (rs && rs->State == adStateOpen) + rs->Close(); +} + +void +Recordset::requery() +{ + // Restore the recordset to reflect all current records. + rs->Filter = ""; + rs->Requery(-1); +} + +void +Recordset::dump() +{ + long count = rs->RecordCount; + QPID_LOG(notice, "DB Dump: " + tableName << + ": " << count << " records"); +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/Recordset.h b/qpid/cpp/src/qpid/store/ms-sql/Recordset.h new file mode 100644 index 0000000000..032b2bd434 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/Recordset.h @@ -0,0 +1,75 @@ +#ifndef QPID_STORE_MSSQL_RECORDSET_H +#define QPID_STORE_MSSQL_RECORDSET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +// Bring in ADO 2.8 (yes, I know it says "15", but that's it...) +#import "C:\Program Files\Common Files\System\ado\msado15.dll" \ + no_namespace rename("EOF", "EndOfFile") +#include <comdef.h> +#include <comutil.h> +#include <string> +#if 0 +#include <utility> +#endif + +namespace qpid { +namespace store { +namespace ms_sql { + +class DatabaseConnection; + +/** + * @class Recordset + * + * Represents an ADO Recordset, abstracting out the common operations needed + * on the common tables used that have 2 fields, persistence ID and blob. + */ +class Recordset { +protected: + _RecordsetPtr rs; + DatabaseConnection* dbConn; + std::string tableName; + + void init(DatabaseConnection* conn, const std::string& table); + void openRs(); + +public: + Recordset() : rs(0), dbConn(0) {} + virtual ~Recordset() { close(); rs = 0; dbConn = 0; } + + /** + * Default open() reads all records into the recordset. + */ + virtual void open(DatabaseConnection* conn, const std::string& table); + void close(); + void requery(); + operator _RecordsetPtr () { return rs; } + + // Dump table contents; useful for debugging. + void dump(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_RECORDSET_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp b/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp new file mode 100644 index 0000000000..6ad7725570 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp @@ -0,0 +1,71 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SqlTransaction.h"
+#include "DatabaseConnection.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+SqlTransaction::SqlTransaction(const boost::shared_ptr<DatabaseConnection>& _db)
+ : db(_db), transDepth(0)
+{
+}
+
+SqlTransaction::~SqlTransaction()
+{
+ if (transDepth > 0)
+ this->abort();
+}
+
+void
+SqlTransaction::begin()
+{
+ _bstr_t beginCmd("BEGIN TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(beginCmd, NULL, adExecuteNoRecords);
+ ++transDepth;
+}
+
+void
+SqlTransaction::commit()
+{
+ if (transDepth > 0) {
+ _bstr_t commitCmd("COMMIT TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(commitCmd, NULL, adExecuteNoRecords);
+ --transDepth;
+ }
+}
+
+void
+SqlTransaction::abort()
+{
+ if (transDepth > 0) {
+ _bstr_t rollbackCmd("ROLLBACK TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(rollbackCmd, NULL, adExecuteNoRecords);
+ transDepth = 0;
+ }
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.h b/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.h new file mode 100644 index 0000000000..8b5239b786 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.h @@ -0,0 +1,67 @@ +#ifndef QPID_STORE_MSSQL_SQLTRANSACTION_H
+#define QPID_STORE_MSSQL_SQLTRANSACTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <string>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @class SqlTransaction
+ *
+ * Class representing an SQL transaction.
+ * Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+ * et al, nested transactions are carried out with direct SQL commands.
+ * To ensure the state of this is known, keep track of how deeply the
+ * transactions are nested. This is more of a safety/sanity check since
+ * AMQP doesn't provide nested transactions.
+ */
+class SqlTransaction {
+
+ boost::shared_ptr<DatabaseConnection> db;
+
+ // Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+ // et al, nested transactions are carried out with direct SQL commands.
+ // To ensure the state of this is known, keep track of how deeply the
+ // transactions are nested.
+ unsigned int transDepth;
+
+public:
+ SqlTransaction(const boost::shared_ptr<DatabaseConnection>& _db);
+ ~SqlTransaction();
+
+ DatabaseConnection *dbConn() { return db.get(); }
+
+ void begin();
+ void commit();
+ void abort();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_SQLTRANSACTION_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/State.cpp b/qpid/cpp/src/qpid/store/ms-sql/State.cpp new file mode 100644 index 0000000000..720603dd11 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/State.cpp @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "State.h" +#include "DatabaseConnection.h" +#include "Exception.h" +#include <comdef.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +State::State() : dbConn(0) +{ + HRESULT hr = ::CoInitializeEx(NULL, COINIT_MULTITHREADED); + if (hr != S_OK && hr != S_FALSE) + throw Exception("Error initializing COM"); +} + +State::~State() +{ + if (dbConn) + delete dbConn; + ::CoUninitialize(); +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/State.h b/qpid/cpp/src/qpid/store/ms-sql/State.h new file mode 100644 index 0000000000..6350bc5bd2 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/State.h @@ -0,0 +1,52 @@ +#ifndef QPID_STORE_MSSQL_STATE_H +#define QPID_STORE_MSSQL_STATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qpid { +namespace store { +namespace ms_sql { + +class DatabaseConnection; + +/** + * @struct State + * + * Represents a thread's state for accessing ADO and the database. + * Creating an instance of State initializes COM for this thread, and + * destroying it uninitializes COM. There's also a DatabaseConnection + * for this thread's default access to the database. More DatabaseConnections + * can always be created, but State has one that can always be used by + * the thread whose state is represented. + * + * This class is intended to be one-per-thread, so it should be accessed + * via thread-specific storage. + */ +struct State { + State(); + ~State(); + DatabaseConnection *dbConn; +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_STATE_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.cpp new file mode 100644 index 0000000000..1309d921a9 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.cpp @@ -0,0 +1,128 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "TplRecordset.h"
+#include "BlobEncoder.h"
+#include "DatabaseConnection.h"
+#include "VariantHelper.h"
+
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+TplRecordset::open(DatabaseConnection* conn, const std::string& table)
+{
+ init(conn, table);
+ // Don't actually open until we know what to do. It's far easier and more
+ // efficient to simply do most of these TPL/xid ops in a single statement.
+}
+
+void
+TplRecordset::add(const std::string& xid)
+{
+ const std::string command =
+ "INSERT INTO " + tableName + " ( xid ) VALUES ( ? )";
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.c_str();
+ cmd->CommandType = adCmdText;
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+TplRecordset::remove(const std::string& xid)
+{
+ // Look up the item by its xid
+ const std::string command =
+ "DELETE FROM " + tableName + " WHERE xid = ?";
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.c_str();
+ cmd->CommandType = adCmdText;
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ _variant_t deletedRecords;
+ cmd->Execute(&deletedRecords, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+TplRecordset::recover(std::set<std::string>& xids)
+{
+ openRs();
+ if (rs->BOF && rs->EndOfFile)
+ return; // Nothing to do
+ rs->MoveFirst();
+ while (!rs->EndOfFile) {
+ _variant_t wxid = rs->Fields->Item["xid"]->Value;
+ char *xidBytes;
+ SafeArrayAccessData(wxid.parray, (void **)&xidBytes);
+ std::string xid(xidBytes, rs->Fields->Item["xid"]->ActualSize);
+ xids.insert(xid);
+ SafeArrayUnaccessData(wxid.parray);
+ rs->MoveNext();
+ }
+}
+
+void
+TplRecordset::dump()
+{
+ Recordset::dump();
+ if (rs->EndOfFile && rs->BOF) // No records
+ return;
+
+ rs->MoveFirst();
+ while (!rs->EndOfFile) {
+ _bstr_t wxid = rs->Fields->Item["xid"]->Value;
+ QPID_LOG(notice, " -> " << (const char *)wxid);
+ rs->MoveNext();
+ }
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.h new file mode 100644 index 0000000000..fbde51738c --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.h @@ -0,0 +1,58 @@ +#ifndef QPID_STORE_MSSQL_TPLRECORDSET_H
+#define QPID_STORE_MSSQL_TPLRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Recordset.h"
+#include <string>
+#include <set>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class TplRecordset
+ *
+ * Class for the TPL (Transaction Prepared List) records.
+ */
+class TplRecordset : public Recordset {
+protected:
+
+public:
+ virtual void open(DatabaseConnection* conn, const std::string& table);
+
+ void add(const std::string& xid);
+
+ // Remove a record given its xid.
+ void remove(const std::string& xid);
+
+ // Recover prepared transaction XIDs.
+ void recover(std::set<std::string>& xids);
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_TPLRECORDSET_H */
diff --git a/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp new file mode 100644 index 0000000000..acec95c1f9 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "VariantHelper.h" + +namespace qpid { +namespace store { +namespace ms_sql { + +template <class Wrapped> +VariantHelper<Wrapped>::VariantHelper() +{ + var.vt = VT_EMPTY; +} + +template <class Wrapped> +VariantHelper<Wrapped>::operator const _variant_t& () const +{ + return var; +} + +// Specialization for using _variant_t to wrap a std::string +VariantHelper<std::string>::VariantHelper(const std::string &init) +{ + if (init.empty() || init.length() == 0) { + var.vt = VT_BSTR; + var.bstrVal = NULL; + } + else { + var.SetString(init.c_str()); + } +} + +VariantHelper<std::string>& +VariantHelper<std::string>::operator=(const std::string &rhs) +{ + if (rhs.empty() || rhs.length() == 0) { + var.vt = VT_BSTR; + var.bstrVal = NULL; + } + else { + var.SetString(rhs.c_str()); + } + return *this; +} + +VariantHelper<std::string>::operator const _variant_t& () const +{ + return var; +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h new file mode 100644 index 0000000000..723dbc3b76 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h @@ -0,0 +1,61 @@ +#ifndef QPID_STORE_MSSQL_VARIANTHELPER_H +#define QPID_STORE_MSSQL_VARIANTHELPER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <comutil.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class VariantHelper + * + * Class template to wrap the details of working with _variant_t objects. + */ +template <class Wrapped> class VariantHelper { +private: + _variant_t var; + +public: + VariantHelper(); + VariantHelper(const Wrapped &init); + + VariantHelper& operator =(const Wrapped& rhs); + operator const _variant_t& () const; +}; + +// Specialization for using _variant_t to wrap a std::string +template<> class VariantHelper<std::string> { +private: + _variant_t var; + +public: + VariantHelper(const std::string &init); + VariantHelper& operator =(const std::string& rhs); + operator const _variant_t& () const; +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_VARIANTHELPER_H */ |