diff options
author | Stephen D. Huston <shuston@apache.org> | 2009-10-21 02:11:04 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2009-10-21 02:11:04 +0000 |
commit | f3565f9fa6edde79a3d40006abf44b568f7279f0 (patch) | |
tree | a09dc2471320677bc6dc4cb9d7f27a4cc06459bd | |
parent | 70800c4ee0975aa890b8ce9a87eda24a5b870f24 (diff) | |
download | qpid-python-f3565f9fa6edde79a3d40006abf44b568f7279f0.tar.gz |
Initial checkin of portable message store plugin and MS SQL-specific storage provider. Goes with QPID-2017
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@827870 13f79535-47bb-0310-9956-ffa450edef68
31 files changed, 4104 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageStore.h b/qpid/cpp/src/qpid/broker/MessageStore.h index f5c55a50f8..143e860ec7 100644 --- a/qpid/cpp/src/qpid/broker/MessageStore.h +++ b/qpid/cpp/src/qpid/broker/MessageStore.h @@ -46,15 +46,7 @@ class MessageStore : public TransactionalStore, public Recoverable { public: /** - * init the store, call before any other call. If not called, store - * is free to pick any defaults - * - * @param options Options object provided by concrete store plug in. - */ - virtual bool init(const Options* options) = 0; - - /** - * If called after init() but before recovery, will discard the database + * If called after initialization but before recovery, will discard the database * and reinitialize using an empty store dir. If the parameter pushDownStoreFiles * is true, the content of the store dir will be moved to a backup dir inside the * store dir. This is used when cluster nodes recover and must get thier content diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h index 2576e266d2..7d49491dfd 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.h +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h @@ -104,9 +104,9 @@ class PersistableMessage : public Persistable void flush(); - bool QPID_BROKER_EXTERN isContentReleased() const; + QPID_BROKER_EXTERN bool isContentReleased() const; - void QPID_BROKER_EXTERN setStore(MessageStore*); + QPID_BROKER_EXTERN void setStore(MessageStore*); void requestContentRelease(); void blockContentRelease(); bool checkContentReleasable(); diff --git a/qpid/cpp/src/qpid/store/CMakeLists.txt b/qpid/cpp/src/qpid/store/CMakeLists.txt new file mode 100644 index 0000000000..452d4b7cf0 --- /dev/null +++ b/qpid/cpp/src/qpid/store/CMakeLists.txt @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +project(qpidc_store) + +#set (CMAKE_VERBOSE_MAKEFILE ON) # for debugging + +include_directories( ${Boost_INCLUDE_DIR} ) + +include_directories( ${CMAKE_CURRENT_SOURCE_DIR} ) +include_directories( ${CMAKE_HOME_DIRECTORY}/include ) + +link_directories( ${Boost_LIBRARY_DIRS} ) + +set (store_SOURCES + MessageStorePlugin.cpp + ) +add_library (store MODULE ${store_SOURCES}) +target_link_libraries (store qpidbroker ${Boost_PROGRAM_OPTIONS_LIBRARY}) +if (CMAKE_COMPILER_IS_GNUCXX) + set_target_properties (store PROPERTIES + PREFIX "" + LINK_FLAGS -Wl,--no-undefined) +endif (CMAKE_COMPILER_IS_GNUCXX) + +if (CMAKE_SYSTEM_NAME STREQUAL Windows) + if (MSVC) + add_definitions( + /D "NOMINMAX" + /D "WIN32_LEAN_AND_MEAN" + ) + endif (MSVC) +endif (CMAKE_SYSTEM_NAME STREQUAL Windows) + +set_target_properties (store PROPERTIES VERSION ${qpidc_version}) + +# Build the MS SQL Storage Provider plugin +set (mssql_default ON) +if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) + set(mssql_default OFF) +endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) +option(BUILD_MSSQL "Build MS SQL Store provider plugin" ${mssql_default}) +if (BUILD_MSSQL) + add_library (mssql_store MODULE + ms-sql/MsSqlProvider.cpp + ms-sql/AmqpTransaction.cpp + ms-sql/BindingRecordset.cpp + ms-sql/BlobAdapter.cpp + ms-sql/BlobEncoder.cpp + ms-sql/BlobRecordset.cpp + ms-sql/DatabaseConnection.cpp + ms-sql/MessageMapRecordset.cpp + ms-sql/MessageRecordset.cpp + ms-sql/Recordset.cpp + ms-sql/State.cpp + ms-sql/VariantHelper.cpp) + target_link_libraries (mssql_store qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY}) +endif (BUILD_MSSQL) diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp new file mode 100644 index 0000000000..385045344f --- /dev/null +++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp @@ -0,0 +1,444 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "MessageStorePlugin.h" +#include "StorageProvider.h" +#include "StoreException.h" +#include "qpid/broker/Broker.h" +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include "qpid/DataDir.h" +#include "qpid/log/Statement.h" + +/* + * The MessageStore pointer given to the Broker points to static storage. + * Thus, it cannot be deleted, especially by the broker. To prevent deletion, + * this no-op deleter is used with the boost::shared_ptr. When the last + * shared_ptr is destroyed, the deleter is called rather than delete(). + */ +namespace { + class NoopDeleter { + public: + NoopDeleter() {} + void operator()(qpid::broker::MessageStore *p) {} + }; +} + +namespace qpid { +namespace store { + +static MessageStorePlugin static_instance_registers_plugin; + + +MessageStorePlugin::StoreOptions::StoreOptions(const std::string& name) : + qpid::Options(name) +{ + addOptions() + ("storage-provider", qpid::optValue(providerName, "PROVIDER"), + "Name of the storage provider to use.") + ; +} + + +void +MessageStorePlugin::earlyInitialize (qpid::Plugin::Target& target) +{ + qpid::broker::Broker* broker = + dynamic_cast<qpid::broker::Broker*>(&target); + if (0 == broker) + return; // Only listen to Broker targets + + // See if there are any storage provider plugins ready. If not, we can't + // do a message store. + qpid::Plugin::earlyInitAll(*this); + + if (providers.empty()) { + QPID_LOG(warning, + "Message store plugin: No storage providers available."); + return; + } + if (!options.providerName.empty()) { + // If specific one was chosen, locate it in loaded set of providers. + provider = providers.find(options.providerName); + if (provider == providers.end()) + throw Exception("Message store plugin: storage provider '" + + options.providerName + + "' does not exist."); + } + else { + // No specific provider chosen; if there's only one, use it. Else + // report the need to pick one. + if (providers.size() > 1) + throw Exception("Message store plugin: multiple provider plugins " + "loaded; must either load only one or select one " + "using --storage-provider"); + provider = providers.begin(); + } + + provider->second->activate(*this); + NoopDeleter d; + boost::shared_ptr<qpid::broker::MessageStore> sp(this, d); + broker->setStore(sp); + target.addFinalizer(boost::bind(&MessageStorePlugin::finalizeMe, this)); +} + +void +MessageStorePlugin::initialize(qpid::Plugin::Target& target) +{ + qpid::broker::Broker* broker = + dynamic_cast<qpid::broker::Broker*>(&target); + if (0 == broker) + return; // Only listen to Broker targets + + // Pass along the initialize step to the provider that's activated. + if (provider != providers.end()) { + provider->second->initialize(*this); + } + // qpid::Plugin::initializeAll(*this); +} + +void +MessageStorePlugin::finalizeMe() +{ + finalize(); // Call finalizers on any Provider plugins +} + +void +MessageStorePlugin::providerAvailable(const std::string name, + StorageProvider *be) +{ + ProviderMap::value_type newSp(name, be); + std::pair<ProviderMap::iterator, bool> inserted = providers.insert(newSp); + if (inserted.second == false) + QPID_LOG(warning, "Storage provider " << name << " duplicate; ignored."); +} + +void +MessageStorePlugin::truncateInit(const bool /*saveStoreContent*/) +{ + QPID_LOG(info, "Store: truncateInit"); +} + + +/** + * Record the existence of a durable queue + */ +void +MessageStorePlugin::create(broker::PersistableQueue& queue, + const framing::FieldTable& args) +{ + if (queue.getName().size() == 0) + { + QPID_LOG(error, + "Cannot create store for empty (null) queue name - " + "ignoring and attempting to continue."); + return; + } + if (queue.getPersistenceId()) { + THROW_STORE_EXCEPTION("Queue already created: " + queue.getName()); + } + provider->second->create(queue, args); +} + +/** + * Destroy a durable queue + */ +void +MessageStorePlugin::destroy(broker::PersistableQueue& queue) +{ + provider->second->destroy(queue); +} + +/** + * Record the existence of a durable exchange + */ +void +MessageStorePlugin::create(const broker::PersistableExchange& exchange, + const framing::FieldTable& args) +{ + if (exchange.getPersistenceId()) { + THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName()); + } + provider->second->create(exchange, args); +} + +/** + * Destroy a durable exchange + */ +void +MessageStorePlugin::destroy(const broker::PersistableExchange& exchange) +{ + provider->second->destroy(exchange); +} + +/** + * Record a binding + */ +void +MessageStorePlugin::bind(const broker::PersistableExchange& exchange, + const broker::PersistableQueue& queue, + const std::string& key, + const framing::FieldTable& args) +{ + provider->second->bind(exchange, queue, key, args); +} + +/** + * Forget a binding + */ +void +MessageStorePlugin::unbind(const broker::PersistableExchange& exchange, + const broker::PersistableQueue& queue, + const std::string& key, + const framing::FieldTable& args) +{ + provider->second->unbind(exchange, queue, key, args); +} + +/** + * Record generic durable configuration + */ +void +MessageStorePlugin::create(const broker::PersistableConfig& config) +{ + if (config.getPersistenceId()) { + THROW_STORE_EXCEPTION("Config item already created: " + + config.getName()); + } + provider->second->create(config); +} + +/** + * Destroy generic durable configuration + */ +void +MessageStorePlugin::destroy(const broker::PersistableConfig& config) +{ + provider->second->destroy(config); +} + +/** + * Stores a message before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). + */ +void +MessageStorePlugin::stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg) +{ + if (msg->getPersistenceId() == 0 && !msg->isContentReleased()) { + provider->second->stage(msg); + } +} + +/** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ +void +MessageStorePlugin::destroy(broker::PersistableMessage& msg) +{ + if (msg.getPersistenceId()) + provider->second->destroy(msg); +} + +/** + * Appends content to a previously staged message + */ +void +MessageStorePlugin::appendContent + (const boost::intrusive_ptr<const broker::PersistableMessage>& msg, + const std::string& data) +{ + if (msg->getPersistenceId()) + provider->second->appendContent(msg, data); + else + THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!"); +} + +/** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ +void +MessageStorePlugin::loadContent(const broker::PersistableQueue& queue, + const boost::intrusive_ptr<const broker::PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length) +{ + if (msg->getPersistenceId()) + provider->second->loadContent(queue, msg, data, offset, length); + else + THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!"); +} + +/** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * Note: The operation is asynchronous so the return of this function does + * not mean the operation is complete. + */ +void +MessageStorePlugin::enqueue(broker::TransactionContext* ctxt, + const boost::intrusive_ptr<broker::PersistableMessage>& msg, + const broker::PersistableQueue& queue) +{ + if (queue.getPersistenceId() == 0) { + THROW_STORE_EXCEPTION("Queue not created: " + queue.getName()); + } + provider->second->enqueue(ctxt, msg, queue); +} + +/** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * Note: The operation is asynchronous so the return of this function does + * not mean the operation is complete. + */ +void +MessageStorePlugin::dequeue(broker::TransactionContext* ctxt, + const boost::intrusive_ptr<broker::PersistableMessage>& msg, + const broker::PersistableQueue& queue) +{ + provider->second->dequeue(ctxt, msg, queue); +} + +/** + * Flushes all async messages to disk for the specified queue + * + * Note: The operation is asynchronous so the return of this function does + * not mean the operation is complete. + */ +void +MessageStorePlugin::flush(const broker::PersistableQueue& queue) +{ + provider->second->flush(queue); +} + +/** + * Returns the number of outstanding AIO's for a given queue + * + * If 0, than all the enqueue / dequeues have been stored + * to disk. + */ +uint32_t +MessageStorePlugin::outstandingQueueAIO(const broker::PersistableQueue& queue) +{ + return provider->second->outstandingQueueAIO(queue); +} + +std::auto_ptr<broker::TransactionContext> +MessageStorePlugin::begin() +{ + return provider->second->begin(); +} + +std::auto_ptr<broker::TPCTransactionContext> +MessageStorePlugin::begin(const std::string& xid) +{ + return provider->second->begin(xid); +} + +void +MessageStorePlugin::prepare(broker::TPCTransactionContext& ctxt) +{ + provider->second->prepare(ctxt); +} + +void +MessageStorePlugin::commit(broker::TransactionContext& ctxt) +{ + provider->second->commit(ctxt); +} + +void +MessageStorePlugin::abort(broker::TransactionContext& ctxt) +{ + provider->second->abort(ctxt); +} + +void +MessageStorePlugin::collectPreparedXids(std::set<std::string>& xids) +{ + provider->second->collectPreparedXids(xids); +} + +/** + * Request recovery of queue and message state; inherited from Recoverable + */ +void +MessageStorePlugin::recover(broker::RecoveryManager& recoverer) +{ + ExchangeMap exchanges; + QueueMap queues; + MessageMap messages; + MessageQueueMap messageQueueMap; + + provider->second->recoverConfigs(recoverer); + provider->second->recoverExchanges(recoverer, exchanges); + provider->second->recoverQueues(recoverer, queues); + provider->second->recoverBindings(recoverer, exchanges); + provider->second->recoverMessages(recoverer, messages, messageQueueMap); + // Enqueue msgs where needed. + for (MessageQueueMap::const_iterator i = messageQueueMap.begin(); + i != messageQueueMap.end(); + ++i) { + // Locate the message corresponding to the current message Id + MessageMap::const_iterator iMsg = messages.find(i->first); + if (iMsg == messages.end()) { + std::ostringstream oss; + oss << "No matching message trying to re-enqueue message " + << i->first; + THROW_STORE_EXCEPTION(oss.str()); + } + broker::RecoverableMessage::shared_ptr msg = iMsg->second; + // Now for each queue referenced in the queue map, locate it + // and re-enqueue the message. + for (std::vector<uint64_t>::const_iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + // Locate the queue corresponding to the current queue Id + QueueMap::const_iterator iQ = queues.find(*j); + if (iQ == queues.end()) { + std::ostringstream oss; + oss << "No matching queue trying to re-enqueue message " + << " on queue Id " << *j; + THROW_STORE_EXCEPTION(oss.str()); + } + iQ->second->recover(msg); + } + } + + // recoverTransactions() and apply correctly while re-enqueuing +} + +}} // namespace qpid::store diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.h b/qpid/cpp/src/qpid/store/MessageStorePlugin.h new file mode 100644 index 0000000000..529a59401e --- /dev/null +++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.h @@ -0,0 +1,284 @@ +#ifndef QPID_STORE_MESSAGESTOREPLUGIN_H +#define QPID_STORE_MESSAGESTOREPLUGIN_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/MessageStore.h" +#include "qpid/broker/PersistableExchange.h" +#include "qpid/broker/PersistableMessage.h" +#include "qpid/broker/PersistableQueue.h" +#include "qpid/management/Manageable.h" + +#include <string> + +using namespace qpid; + +namespace qpid { +namespace store { + +class StorageProvider; + +/** + * @class MessageStorePlugin + * + * MessageStorePlugin is the front end of the persistent message store + * plugin. It is responsible for coordinating recovery, initialization, + * transactions (both local and distributed), flow-to-disk loading and + * unloading and persisting broker state (queues, bindings etc.). + * Actual storage operations are carried out by a message store storage + * provider that implements the qpid::store::StorageProvider interface. + */ +class MessageStorePlugin : + public qpid::Plugin, + public qpid::broker::MessageStore, // Frontend classes + public qpid::Plugin::Target // Provider target + // @TODO Need a mgmt story for this. Maybe allow r/o access to provider store info? public qpid::management::Manageable +{ + public: + MessageStorePlugin() {} + + /** + * @name Methods inherited from qpid::Plugin + */ + //@{ + virtual Options* getOptions() { return &options; } + virtual void earlyInitialize (Plugin::Target& target); + virtual void initialize(Plugin::Target& target); + //@} + + /// Finalizer; calls Target::finalize() to run finalizers on + /// StorageProviders. + void finalizeMe(); + + /** + * Called by StorageProvider instances during the earlyInitialize sequence. + * Each StorageProvider must supply a unique name by which it is known and a + * pointer to itself. + */ + virtual void providerAvailable(const std::string name, StorageProvider *be); + + /** + * @name Methods inherited from qpid::broker::MessageStore + */ + //@{ + /** + * If called before recovery, will discard the database and reinitialize + * using an empty store. This is used when cluster nodes recover and + * must get their content from a cluster sync rather than directly from + * the store. + * + * @param saveStoreContent If true, the store's contents should be + * saved to a backup location before + * reinitializing the store content. + */ + virtual void truncateInit(const bool saveStoreContent = false); + + /** + * Record the existence of a durable queue + */ + virtual void create(broker::PersistableQueue& queue, + const framing::FieldTable& args); + /** + * Destroy a durable queue + */ + virtual void destroy(broker::PersistableQueue& queue); + + /** + * Record the existence of a durable exchange + */ + virtual void create(const broker::PersistableExchange& exchange, + const framing::FieldTable& args); + /** + * Destroy a durable exchange + */ + virtual void destroy(const broker::PersistableExchange& exchange); + + /** + * Record a binding + */ + virtual void bind(const broker::PersistableExchange& exchange, + const broker::PersistableQueue& queue, + const std::string& key, + const framing::FieldTable& args); + + /** + * Forget a binding + */ + virtual void unbind(const broker::PersistableExchange& exchange, + const broker::PersistableQueue& queue, + const std::string& key, + const framing::FieldTable& args); + + /** + * Record generic durable configuration + */ + virtual void create(const broker::PersistableConfig& config); + + /** + * Destroy generic durable configuration + */ + virtual void destroy(const broker::PersistableConfig& config); + + /** + * Stores a message before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ + virtual void stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg); + + /** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ + virtual void destroy(broker::PersistableMessage& msg); + + /** + * Appends content to a previously staged message + */ + virtual void appendContent(const boost::intrusive_ptr<const broker::PersistableMessage>& msg, + const std::string& data); + + /** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ + virtual void loadContent(const broker::PersistableQueue& queue, + const boost::intrusive_ptr<const broker::PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length); + + /** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * Note: The operation is asynchronous so the return of this function does + * not mean the operation is complete. + * + * @param msg the message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void enqueue(broker::TransactionContext* ctxt, + const boost::intrusive_ptr<broker::PersistableMessage>& msg, + const broker::PersistableQueue& queue); + + /** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * + * Note: The operation is asynchronous so the return of this function does + * not mean the operation is complete. + * + * @param msg the message to dequeue + * @param queue the name of the queue from which it is to be dequeued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void dequeue(broker::TransactionContext* ctxt, + const boost::intrusive_ptr<broker::PersistableMessage>& msg, + const broker::PersistableQueue& queue); + + /** + * Flushes all async messages to disk for the specified queue + * + * + * Note: The operation is asynchronous so the return of this function does + * not mean the operation is complete. + * + * @param queue the name of the queue from which it is to be dequeued + */ + virtual void flush(const broker::PersistableQueue& queue); + + /** + * Returns the number of outstanding AIO's for a given queue + * + * If 0, than all the enqueue / dequeues have been stored + * to disk + * + * @param queue the name of the queue to check for outstanding AIO + */ + virtual uint32_t outstandingQueueAIO(const broker::PersistableQueue& queue); + //@} + + /** + * @name Methods inherited from qpid::broker::TransactionalStore + */ + //@{ + std::auto_ptr<broker::TransactionContext> begin(); + + std::auto_ptr<broker::TPCTransactionContext> begin(const std::string& xid); + + void prepare(broker::TPCTransactionContext& ctxt); + + void commit(broker::TransactionContext& ctxt); + + void abort(broker::TransactionContext& ctxt); + + void collectPreparedXids(std::set<std::string>& xids); + //@} + + /** + * Request recovery of queue and message state; inherited from Recoverable + */ + virtual void recover(broker::RecoveryManager& recoverer); + + // inline management::Manageable::status_t ManagementMethod (uint32_t, management::Args&, std::string&) + // { return management::Manageable::STATUS_OK; } + + protected: + + struct StoreOptions : public qpid::Options { + StoreOptions(const std::string& name="Store Options"); + std::string providerName; + }; + StoreOptions options; + + typedef std::map<const std::string, StorageProvider*> ProviderMap; + ProviderMap providers; + ProviderMap::const_iterator provider; + +}; // class MessageStoreImpl + +} // namespace msgstore +} // namespace mrg + +#endif /* QPID_SERIALIZER_H */ diff --git a/qpid/cpp/src/qpid/store/StorageProvider.h b/qpid/cpp/src/qpid/store/StorageProvider.h new file mode 100644 index 0000000000..b0db13c41f --- /dev/null +++ b/qpid/cpp/src/qpid/store/StorageProvider.h @@ -0,0 +1,322 @@ +#ifndef QPID_STORE_STORAGEPROVIDER_H +#define QPID_STORE_STORAGEPROVIDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <map> +#include <stdexcept> +#include <vector> +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include "qpid/broker/MessageStore.h" + +using qpid::broker::PersistableConfig; +using qpid::broker::PersistableExchange; +using qpid::broker::PersistableMessage; +using qpid::broker::PersistableQueue; + +namespace qpid { +namespace store { + +typedef std::map<uint64_t, qpid::broker::RecoverableExchange::shared_ptr> + ExchangeMap; +typedef std::map<uint64_t, qpid::broker::RecoverableQueue::shared_ptr> + QueueMap; +typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr> + MessageMap; +// Msg Id -> vector of queue Ids where message is queued +typedef std::map<uint64_t, std::vector<uint64_t> > MessageQueueMap; + +class MessageStorePlugin; + +/** + * @class StorageProvider + * + * StorageProvider defines the interface for the storage provider plugin to the + * Qpid broker persistence store plugin. + * + * @TODO Should StorageProvider also inherit from MessageStore? If so, then + * maybe remove Recoverable from MessageStore's inheritance and move it + * to MessageStorePlugin? In any event, somehow the discardInit() feature + * needs to get added here. + */ +class StorageProvider : public qpid::Plugin, public qpid::broker::MessageStore +{ +public: + + class Exception : public std::exception + { + public: + virtual ~Exception() throw() {} + virtual const char *what() const throw() = 0; + }; + + /** + * @name Methods inherited from qpid::Plugin + */ + //@{ + /** + * Return a pointer to the provider's options. The options will be + * updated during option parsing by the host program; therefore, the + * referenced Options object must remain valid past this function's return. + * + * @return An options group or 0 for no options. Default returns 0. + * Plugin retains ownership of return value. + */ + virtual qpid::Options* getOptions() = 0; + + /** + * Initialize Plugin functionality on a Target, called before + * initializing the target. + * + * StorageProviders should respond only to Targets of class + * qpid::store::MessageStorePlugin and ignore all others. + * + * When called, the provider should invoke the method + * qpid::store::MessageStorePlugin::providerAvailable() to alert the + * message store of StorageProvider's availability. + * + * Called before the target itself is initialized. + */ + virtual void earlyInitialize (Plugin::Target& target) = 0; + + /** + * Initialize StorageProvider functionality. Called after initializing + * the target. + * + * StorageProviders should respond only to Targets of class + * qpid::store::MessageStorePlugin and ignore all others. + * + * Called after the target is fully initialized. + */ + virtual void initialize(Plugin::Target& target) = 0; + //@} + + /** + * Receive notification that this provider is the one that will actively + * handle storage for the target. If the provider is to be used, this + * method will be called after earlyInitialize() and before any + * recovery operations (recovery, in turn, precedes call to initialize()). + * Thus, it is wise to not actually do any database ops from within + * earlyInitialize() - they can wait until activate() is called because + * at that point it is certain the database will be needed. + */ + virtual void activate(MessageStorePlugin &store) = 0; + + /** + * @name Methods inherited from qpid::broker::MessageStore + */ + //@{ + /** + * If called after init() but before recovery, will discard the database + * and reinitialize using an empty store dir. If @a pushDownStoreFiles + * is true, the content of the store dir will be moved to a backup dir + * inside the store dir. This is used when cluster nodes recover and must + * get thier content from a cluster sync rather than directly fromt the + * store. + * + * @param pushDownStoreFiles If true, will move content of the store dir + * into a subdir, leaving the store dir + * otherwise empty. + */ + virtual void truncateInit(const bool pushDownStoreFiles = false) = 0; + + /** + * Record the existence of a durable queue + */ + virtual void create(PersistableQueue& queue, + const qpid::framing::FieldTable& args) = 0; + /** + * Destroy a durable queue + */ + virtual void destroy(PersistableQueue& queue) = 0; + + /** + * Record the existence of a durable exchange + */ + virtual void create(const PersistableExchange& exchange, + const qpid::framing::FieldTable& args) = 0; + /** + * Destroy a durable exchange + */ + virtual void destroy(const PersistableExchange& exchange) = 0; + + /** + * Record a binding + */ + virtual void bind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args) = 0; + + /** + * Forget a binding + */ + virtual void unbind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args) = 0; + + /** + * Record generic durable configuration + */ + virtual void create(const PersistableConfig& config) = 0; + + /** + * Destroy generic durable configuration + */ + virtual void destroy(const PersistableConfig& config) = 0; + + /** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ + virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg) = 0; + + /** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ + virtual void destroy(PersistableMessage& msg) = 0; + + /** + * Appends content to a previously staged message + */ + virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, + const std::string& data) = 0; + + /** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ + virtual void loadContent(const PersistableQueue& queue, + const boost::intrusive_ptr<const PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length) = 0; + + /** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param msg the message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void enqueue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) = 0; + + /** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param msg the message to dequeue + * @param queue the name of the queue from which it is to be dequeued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) = 0; + + /** + * Flushes all async messages to disk for the specified queue + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param queue the name of the queue from which it is to be dequeued + */ + virtual void flush(const qpid::broker::PersistableQueue& queue) = 0; + + /** + * Returns the number of outstanding AIO's for a given queue + * + * If 0, than all the enqueue / dequeues have been stored + * to disk + * + * @param queue the name of the queue to check for outstanding AIO + */ + virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) = 0; + //@} + + /** + * @TODO This should probably not be here - it's only here because + * MessageStore inherits from Recoverable... maybe move that derivation. + * + * As it is now, we don't use this. Separate recover methods are + * declared below for individual types, which also set up maps of + * messages, queues, transactions for the main store plugin to handle + * properly. + * + * Request recovery of queue and message state. + */ + virtual void recover(qpid::broker::RecoveryManager& recoverer) {} + + /** + * @name Methods that do the recovery of the various objects that + * were saved. + */ + //@{ + + /** + * Recover bindings. + */ + virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer) = 0; + virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer, + ExchangeMap& exchangeMap) = 0; + virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer, + QueueMap& queueMap) = 0; + virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer, + const ExchangeMap& exchangeMap) = 0; + virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, + MessageMap& messageMap, + MessageQueueMap& messageQueueMap) = 0; + //@} +}; + +}} // namespace qpid::store + +#endif /* QPID_STORE_STORAGEPROVIDER_H */ diff --git a/qpid/cpp/src/qpid/store/StoreException.h b/qpid/cpp/src/qpid/store/StoreException.h new file mode 100644 index 0000000000..1dc7f670ec --- /dev/null +++ b/qpid/cpp/src/qpid/store/StoreException.h @@ -0,0 +1,49 @@ +#ifndef QPID_STORE_STOREEXCEPTION_H +#define QPID_STORE_STOREEXCEPTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <exception> +#include <boost/format.hpp> +#include "StorageProvider.h" + +namespace qpid { +namespace store { + +class StoreException : public std::exception +{ + std::string text; +public: + StoreException(const std::string& _text) : text(_text) {} + StoreException(const std::string& _text, + const StorageProvider::Exception& cause) + : text(_text + ": " + cause.what()) {} + virtual ~StoreException() throw() {} + virtual const char* what() const throw() { return text.c_str(); } +}; + +#define THROW_STORE_EXCEPTION(MESSAGE) throw qpid::store::StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ % __LINE__)) +#define THROW_STORE_EXCEPTION_2(MESSAGE, EXCEPTION) throw qpid::store::StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ % __LINE__), EXCEPTION) + +}} // namespace qpid::store + +#endif /* QPID_STORE_STOREEXCEPTION_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp new file mode 100644 index 0000000000..0ecfacfb4b --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AmqpTransaction.h" +#include "DatabaseConnection.h" + +namespace qpid { +namespace store { +namespace ms_sql { + +AmqpTransaction::AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db) + : db(_db), transDepth(0) +{ +} + +AmqpTransaction::~AmqpTransaction() +{ + if (transDepth > 0) + this->abort(); +} + +void +AmqpTransaction::begin() +{ + _bstr_t beginCmd("BEGIN TRANSACTION"); + _ConnectionPtr c = *db; + c->Execute(beginCmd, NULL, adExecuteNoRecords); + ++transDepth; +} + +void +AmqpTransaction::commit() +{ + if (transDepth > 0) { + _bstr_t commitCmd("COMMIT TRANSACTION"); + _ConnectionPtr c = *db; + c->Execute(commitCmd, NULL, adExecuteNoRecords); + --transDepth; + } +} + +void +AmqpTransaction::abort() +{ + if (transDepth > 0) { + _bstr_t rollbackCmd("ROLLBACK TRANSACTION"); + _ConnectionPtr c = *db; + c->Execute(rollbackCmd, NULL, adExecuteNoRecords); + transDepth = 0; + } +} + +AmqpTPCTransaction::AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db, + const std::string& _xid) + : AmqpTransaction(_db), xid(_xid) +{ +} + +AmqpTPCTransaction::~AmqpTPCTransaction() +{ +} + +void +AmqpTPCTransaction::prepare() +{ + // Intermediate transactions should have already assured integrity of + // the content in the database; just waiting to pull the trigger on the + // outermost transaction. +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h new file mode 100644 index 0000000000..9b87d0ae15 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h @@ -0,0 +1,84 @@ +#ifndef QPID_STORE_MSSQL_AMQPTRANSACTION_H +#define QPID_STORE_MSSQL_AMQPTRANSACTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/broker/TransactionalStore.h> +#include <string> +#include <memory> + +namespace qpid { +namespace store { +namespace ms_sql { + +class DatabaseConnection; + +/** + * @class AmqpTransaction + * + * Class representing an AMQP transaction. This is used around a set of + * enqueue and dequeue operations that occur when the broker is acting + * on a transaction commit/abort from the client. + */ +class AmqpTransaction : public qpid::broker::TransactionContext { + + std::auto_ptr<DatabaseConnection> db; + + // Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(), + // et al, nested transactions are carried out with direct SQL commands. + // To ensure the state of this is known, keep track of how deeply the + // transactions are nested. + unsigned int transDepth; + +public: + AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db); + virtual ~AmqpTransaction(); + + DatabaseConnection *dbConn() { return db.get(); } + + void begin(); + void commit(); + void abort(); +}; + +/** + * @class AmqpTPCTransaction + * + * Class representing a Two-Phase-Commit (TPC) AMQP transaction. This is + * used around a set of enqueue and dequeue operations that occur when the + * broker is acting on a transaction prepare/commit/abort from the client. + */ +class AmqpTPCTransaction : public AmqpTransaction, + public qpid::broker::TPCTransactionContext { + std::string xid; + +public: + AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db, + const std::string& _xid); + virtual ~AmqpTPCTransaction(); + + void prepare(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_AMQPTRANSACTION_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp new file mode 100644 index 0000000000..737d93b142 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp @@ -0,0 +1,150 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Exception.h> +#include <qpid/log/Statement.h> + +#include "BindingRecordset.h" +#include "BlobAdapter.h" +#include "BlobEncoder.h" +#include "VariantHelper.h" + +namespace qpid { +namespace store { +namespace ms_sql { + +void +BindingRecordset::add(uint64_t exchangeId, + const std::string& queueName, + const std::string& routingKey, + const qpid::framing::FieldTable& args) +{ + VariantHelper<std::string> queueNameStr(queueName); + VariantHelper<std::string> routingKeyStr(routingKey); + BlobEncoder blob (args); // Marshall field table to a blob + rs->AddNew(); + rs->Fields->GetItem("exchangeId")->Value = exchangeId; + rs->Fields->GetItem("queueName")->Value = queueNameStr; + rs->Fields->GetItem("routingKey")->Value = routingKeyStr; + rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob); + rs->Update(); +} + +void +BindingRecordset::remove(uint64_t exchangeId, + const std::string& queueName, + const std::string& routingKey, + const qpid::framing::FieldTable& /*args*/) +{ + // Look up the affected binding. + std::ostringstream filter; + filter << "exchangeId = " << exchangeId + << " AND queueName = '" << queueName << "'" + << " AND routingKey = '" << routingKey << "'" << std::ends; + rs->PutFilter (VariantHelper<std::string>(filter.str())); + if (rs->RecordCount != 0) { + // Delete the records + rs->Delete(adAffectGroup); + rs->Update(); + } + requery(); +} + +void +BindingRecordset::remove(uint64_t exchangeId) +{ + // Look up the affected bindings by the exchange ID + std::ostringstream filter; + filter << "exchangeId = " << exchangeId << std::ends; + rs->PutFilter (VariantHelper<std::string>(filter.str())); + if (rs->RecordCount != 0) { + // Delete the records + rs->Delete(adAffectGroup); + rs->Update(); + } + requery(); +} + +void +BindingRecordset::remove(const std::string& queueName) +{ + // Look up the affected bindings by the exchange ID + std::ostringstream filter; + filter << "queueName = '" << queueName << "'" << std::ends; + rs->PutFilter (VariantHelper<std::string>(filter.str())); + if (rs->RecordCount != 0) { + // Delete the records + rs->Delete(adAffectGroup); + rs->Update(); + } + requery(); +} + +void +BindingRecordset::recover(qpid::broker::RecoveryManager& recoverer, + std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap) +{ + if (rs->BOF && rs->EndOfFile) + return; // Nothing to do + rs->MoveFirst(); + Binding b; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&b); + while (!rs->EndOfFile) { + long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + broker::RecoverableExchange::shared_ptr exch = exchMap[b.exchangeId]; + std::string q(b.queueName), k(b.routingKey); + exch->bind(q, k, blob); + rs->MoveNext(); + } + + piAdoRecordBinding->Release(); +} + +void +BindingRecordset::dump() +{ + Recordset::dump(); + if (rs->EndOfFile && rs->BOF) // No records + return; + rs->MoveFirst(); + + Binding b; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&b); + + while (VARIANT_FALSE == rs->EndOfFile) { + QPID_LOG(notice, "exch " << b.exchangeId + << ", q: " << b.queueName + << ", k: " << b.routingKey); + rs->MoveNext(); + } + + piAdoRecordBinding->Release(); +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h new file mode 100644 index 0000000000..5c51636f4f --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h @@ -0,0 +1,84 @@ +#ifndef QPID_STORE_MSSQL_BINDINGRECORDSET_H +#define QPID_STORE_MSSQL_BINDINGRECORDSET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <icrsint.h> +#include "Recordset.h" +#include <qpid/broker/RecoveryManager.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class BindingRecordset + * + * Class for the binding records. + */ +class BindingRecordset : public Recordset { + + class Binding : public CADORecordBinding { + BEGIN_ADO_BINDING(Binding) + ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, exchangeId, FALSE) + ADO_VARIABLE_LENGTH_ENTRY4(2, adVarChar, queueName, + sizeof(queueName), FALSE) + ADO_VARIABLE_LENGTH_ENTRY4(3, adVarChar, routingKey, + sizeof(routingKey), FALSE) + END_ADO_BINDING() + + public: + uint64_t exchangeId; + char queueName[256]; + char routingKey[256]; + }; + +public: + // Add a new binding + void add(uint64_t exchangeId, + const std::string& queueName, + const std::string& routingKey, + const qpid::framing::FieldTable& args); + + // Remove a specific binding + void remove(uint64_t exchangeId, + const std::string& queueName, + const std::string& routingKey, + const qpid::framing::FieldTable& args); + + // Remove all bindings for the specified exchange + void remove(uint64_t exchangeId); + + // Remove all bindings for the specified queue + void remove(const std::string& queueName); + + // Recover bindings set using exchMap to get from Id to RecoverableExchange. + void recover(qpid::broker::RecoveryManager& recoverer, + std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap); + + // Dump table contents; useful for debugging. + void dump(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_BINDINGRECORDSET_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp new file mode 100644 index 0000000000..1889f34e41 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BlobAdapter.h" +#include <qpid/Exception.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +void +BlobAdapter::extractBuff() +{ + // To give a valid Buffer back, lock the safearray, obtaining a pointer to + // the actual data. Record the pointer in the Buffer so the destructor + // knows to unlock the safearray. + if (buff.getPointer() == 0) { + char *blob; + SafeArrayAccessData(this->parray, (void **)&blob); + qpid::framing::Buffer lockedBuff(blob, buff.getSize()); + buff = lockedBuff; + } +} + + +BlobAdapter::~BlobAdapter() +{ + // If buff's pointer is set, the safearray is locked, so unlock it + if (buff.getPointer() != 0) + SafeArrayUnaccessData(this->parray); +} + +BlobAdapter::operator qpid::framing::Buffer& () +{ + extractBuff(); + return buff; +} + +BlobAdapter::operator qpid::framing::FieldTable& () +{ + extractBuff(); + fields.decode(buff); + return fields; +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h new file mode 100644 index 0000000000..1c666392bc --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h @@ -0,0 +1,62 @@ +#ifndef QPID_STORE_MSSQL_BLOBADAPTER_H +#define QPID_STORE_MSSQL_BLOBADAPTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <comutil.h> +#include <qpid/framing/Buffer.h> +#include <qpid/framing/FieldTable.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class BlobAdapter + * + * Adapter for accessing a blob (varbinary SQL field) as a qpid::framing::Buffer + * in an exception-safe way. + */ +class BlobAdapter : public _variant_t { +private: + // This Buffer's pointer indicates whether or not a safearray has + // been locked; if it's 0, no locking was done. + qpid::framing::Buffer buff; + qpid::framing::FieldTable fields; + + void extractBuff(); + +public: + // Initialize with the known length of the data that will come. + // Assigning a _variant_t to this object will set up the array to be + // accessed with the operator Buffer&() + BlobAdapter(long blobSize) : _variant_t(), buff(0, blobSize) {} + ~BlobAdapter(); + BlobAdapter& operator=(_variant_t& var_t_Src) + { _variant_t::operator=(var_t_Src); return *this; } + operator qpid::framing::Buffer& (); + operator qpid::framing::FieldTable& (); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_BLOBADAPTER_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp new file mode 100644 index 0000000000..75d3dc2d86 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp @@ -0,0 +1,133 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BlobEncoder.h" +#include <qpid/Exception.h> +#include <qpid/broker/Persistable.h> +#include <qpid/broker/PersistableMessage.h> +#include <boost/intrusive_ptr.hpp> +#include <memory.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +template <class ITEM> void +BlobEncoder::encode(const ITEM &item) +{ + SAFEARRAYBOUND bound[1] = {0, 0}; + bound[0].cElements = item.encodedSize(); + blob = SafeArrayCreate(VT_UI1, 1, bound); + if (S_OK != SafeArrayLock(blob)) { + SafeArrayDestroy(blob); + blob = 0; + throw qpid::Exception("Error locking blob area for persistable item"); + } + try { + qpid::framing::Buffer buff((char *)blob->pvData, bound[0].cElements); + item.encode(buff); + } + catch(...) { + SafeArrayUnlock(blob); + SafeArrayDestroy(blob); + blob = 0; + throw; + } + this->vt = VT_ARRAY | VT_UI1; + this->parray = blob; + SafeArrayUnlock(blob); +} + +template <> void +BlobEncoder::encode(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &item) +{ + // NOTE! If this code changes, verify the recovery code in MessageRecordset + SAFEARRAYBOUND bound[1] = {0, 0}; + bound[0].cElements = item->encodedSize() + sizeof(uint32_t); + blob = SafeArrayCreate(VT_UI1, 1, bound); + if (S_OK != SafeArrayLock(blob)) { + SafeArrayDestroy(blob); + blob = 0; + throw qpid::Exception("Error locking blob area for message"); + } + try { + uint32_t headerSize = item->encodedHeaderSize(); + qpid::framing::Buffer buff((char *)blob->pvData, bound[0].cElements); + buff.putLong(headerSize); + item->encode(buff); + } + catch(...) { + SafeArrayUnlock(blob); + SafeArrayDestroy(blob); + blob = 0; + throw; + } + this->vt = VT_ARRAY | VT_UI1; + this->parray = blob; + SafeArrayUnlock(blob); +} + +template <> void +BlobEncoder::encode(const std::string &item) +{ + SAFEARRAYBOUND bound[1] = {0, 0}; + bound[0].cElements = item.size(); + blob = SafeArrayCreate(VT_UI1, 1, bound); + if (S_OK != SafeArrayLock(blob)) { + SafeArrayDestroy(blob); + blob = 0; + throw qpid::Exception("Error locking blob area for string"); + } + memcpy_s(blob->pvData, item.size(), item.data(), item.size()); + this->vt = VT_ARRAY | VT_UI1; + this->parray = blob; + SafeArrayUnlock(blob); +} + +BlobEncoder::BlobEncoder(const qpid::broker::Persistable &item) : blob(0) +{ + encode(item); +} + +BlobEncoder::BlobEncoder(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &msg) : blob(0) +{ + encode(msg); +} + +BlobEncoder::BlobEncoder(const qpid::framing::FieldTable &fields) : blob(0) +{ + encode(fields); +} + +BlobEncoder::BlobEncoder(const std::string &data) : blob(0) +{ + encode(data); +} + +BlobEncoder::~BlobEncoder() +{ + if (blob) + SafeArrayDestroy(blob); + blob = 0; + this->parray = 0; +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h new file mode 100644 index 0000000000..d2b56223c1 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h @@ -0,0 +1,61 @@ +#ifndef QPID_STORE_MSSQL_BLOBENCODER_H +#define QPID_STORE_MSSQL_BLOBENCODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <comutil.h> +#include <string> +#include <boost/intrusive_ptr.hpp> +#include <qpid/broker/Persistable.h> +#include <qpid/broker/PersistableMessage.h> +#include <qpid/framing/Buffer.h> +#include <qpid/framing/FieldTable.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class BlobEncoder + * + * Encodes a blob (varbinary) field from a qpid::broker::Persistable or a + * qpid::framing::FieldTable (both of which can be encoded to + * qpid::framing::Buffer) so it can be passed to ADO methods for writing + * to the database. + */ +class BlobEncoder : public _variant_t { +private: + SAFEARRAY *blob; + + template <class ITEM> void encode(const ITEM &item); + +public: + BlobEncoder(const qpid::broker::Persistable &item); + BlobEncoder(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &msg); + BlobEncoder(const qpid::framing::FieldTable &fields); + BlobEncoder(const std::string& data); + ~BlobEncoder(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_BLOBENCODER_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp new file mode 100644 index 0000000000..977eb6d5d2 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Exception.h> +#include <qpid/log/Statement.h> + +#include "BlobRecordset.h" +#include "BlobEncoder.h" +#include "VariantHelper.h" + +namespace qpid { +namespace store { +namespace ms_sql { + +void +BlobRecordset::remove(uint64_t id) +{ + // Look up the item by its persistenceId + std::ostringstream filter; + filter << "persistenceId = " << id << std::ends; + rs->PutFilter (VariantHelper<std::string>(filter.str())); + if (!rs->EndOfFile) { + // Delete the record + rs->Delete(adAffectCurrent); + rs->Update(); + } +} + +void +BlobRecordset::add(const qpid::broker::Persistable& item) +{ + BlobEncoder blob (item); // Marshall item info to a blob + rs->AddNew(); + item.setPersistenceId(rs->Fields->Item["persistenceId"]->Value); + rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob); + rs->Update(); +} + +void +BlobRecordset::remove(const qpid::broker::Persistable& item) +{ + remove(item.getPersistenceId()); +} + +void +BlobRecordset::dump() +{ + Recordset::dump(); +#if 1 + if (rs->EndOfFile && rs->BOF) // No records + return; + + rs->MoveFirst(); + while (!rs->EndOfFile) { + uint64_t id = rs->Fields->Item["persistenceId"]->Value; + QPID_LOG(notice, " -> " << id); + rs->MoveNext(); + } +#else + for (Iterator iter = begin(); iter != end(); ++iter) { + uint64_t id = *iter.first; + QPID_LOG(notice, " -> " << id); + } +#endif +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h new file mode 100644 index 0000000000..4e89254b48 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h @@ -0,0 +1,53 @@ +#ifndef QPID_STORE_MSSQL_BLOBRECORDSET_H +#define QPID_STORE_MSSQL_BLOBRECORDSET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Recordset.h" +#include <qpid/broker/Persistable.h> +#include <string> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class BlobRecordset + * + * Class for the "blob" records that record an id, varbinary(max) pair. + */ +class BlobRecordset : public Recordset { +protected: + // Remove a record given its Id. + void remove(uint64_t id); + +public: + void add(const qpid::broker::Persistable& item); + void remove(const qpid::broker::Persistable& item); + + // Dump table contents; useful for debugging. + void dump(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_BLOBRECORDSET_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp new file mode 100644 index 0000000000..34edec8acd --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "DatabaseConnection.h" +#include "Exception.h" +#include <comdef.h> +namespace { +inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);}; +} + +namespace qpid { +namespace store { +namespace ms_sql { + +DatabaseConnection::DatabaseConnection() : conn(0) +{ +} + +DatabaseConnection::~DatabaseConnection() +{ + close(); +} + +void +DatabaseConnection::open(const std::string& connectString, + const std::string& dbName) +{ + if (conn && conn->State == adStateOpen) + return; + std::string adoConnect = "Provider=SQLOLEDB;" + connectString; + try { + TESTHR(conn.CreateInstance(__uuidof(Connection))); + conn->ConnectionString = adoConnect.c_str(); + conn->Open("", "", "", adConnectUnspecified); + if (dbName.length() > 0) + conn->DefaultDatabase = dbName.c_str(); + } + catch(_com_error &e) { + close(); + throw ADOException("MSSQL can't open " + dbName + " at " + adoConnect, e); + } +} + +void +DatabaseConnection::close() +{ + if (conn && conn->State == adStateOpen) + conn->Close(); + conn = 0; +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h new file mode 100644 index 0000000000..2b8bbffa90 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h @@ -0,0 +1,62 @@ +#ifndef QPID_STORE_MSSQL_DATABASECONNECTION_H +#define QPID_STORE_MSSQL_DATABASECONNECTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Bring in ADO 2.8 (yes, I know it says "15", but that's it...) +#import "C:\Program Files\Common Files\System\ado\msado15.dll" \ + no_namespace rename("EOF", "EndOfFile") + +#include <string> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class DatabaseConnection + * + * Represents a connection to the SQL database. This class wraps the + * needed _ConnectionPtr for ADO as well as the needed COM initialization + * and cleanup that each thread requires. It is expected that this class + * will be maintained in thread-specific storage so it has no locks. + */ +class DatabaseConnection { +protected: + _ConnectionPtr conn; + +public: + DatabaseConnection(); + ~DatabaseConnection(); + void open(const std::string& connectString, + const std::string& dbName = ""); + void close(); + operator _ConnectionPtr () { return conn; } + + void beginTransaction() { conn->BeginTrans(); } + void commitTransaction() {conn->CommitTrans(); } + void rollbackTransaction() { conn->RollbackTrans(); } +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_DATABASECONNECTION_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/Exception.h b/qpid/cpp/src/qpid/store/ms-sql/Exception.h new file mode 100644 index 0000000000..34e401b068 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/Exception.h @@ -0,0 +1,56 @@ +#ifndef QPID_STORE_MSSQL_EXCEPTION_H +#define QPID_STORE_MSSQL_EXCEPTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <comdef.h> +#include <qpid/store/StorageProvider.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +class Exception : public qpid::store::StorageProvider::Exception +{ +protected: + std::string text; +public: + Exception(const std::string& _text) : text(_text) {} + virtual ~Exception() {} + virtual const char* what() const throw() { return text.c_str(); } +}; + +class ADOException : public Exception +{ +public: + ADOException(const std::string& _text, _com_error &e) + : Exception(_text) { + text += ": "; + _bstr_t wmsg = e.Description(); + text += (const char *)wmsg; + } +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_EXCEPTION_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp new file mode 100644 index 0000000000..5872dbc2e4 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp @@ -0,0 +1,989 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <stdlib.h> +#include <string> +#include <windows.h> +#include <qpid/broker/RecoverableQueue.h> +#include <qpid/log/Statement.h> +#include <qpid/store/MessageStorePlugin.h> +#include <qpid/store/StorageProvider.h> +#include "AmqpTransaction.h" +#include "BlobAdapter.h" +#include "BlobRecordset.h" +#include "BindingRecordset.h" +#include "MessageMapRecordset.h" +#include "MessageRecordset.h" +#include "DatabaseConnection.h" +#include "Exception.h" +#include "State.h" +#include "VariantHelper.h" + +// Bring in ADO 2.8 (yes, I know it says "15", but that's it...) +#import "C:\Program Files\Common Files\System\ado\msado15.dll" \ + no_namespace rename("EOF", "EndOfFile") +#include <comdef.h> +namespace { +inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);}; + +// Table names +const std::string TblBinding("tblBinding"); +const std::string TblConfig("tblConfig"); +const std::string TblExchange("tblExchange"); +const std::string TblMessage("tblMessage"); +const std::string TblMessageMap("tblMessageMap"); +const std::string TblQueue("tblQueue"); +} + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class MSSqlProvider + * + * Implements a qpid::store::StorageProvider that uses Microsoft SQL Server as + * the backend data store for Qpid. + */ +class MSSqlProvider : public qpid::store::StorageProvider +{ +protected: + void finalizeMe(); + + void dump(); + +public: + MSSqlProvider(); + ~MSSqlProvider(); + + virtual qpid::Options* getOptions() { return &options; } + + virtual void earlyInitialize (Plugin::Target& target); + virtual void initialize(Plugin::Target& target); + + /** + * Receive notification that this provider is the one that will actively + * handle provider storage for the target. If the provider is to be used, + * this method will be called after earlyInitialize() and before any + * recovery operations (recovery, in turn, precedes call to initialize()). + */ + virtual void activate(MessageStorePlugin &store); + + /** + * @name Methods inherited from qpid::broker::MessageStore + */ + //@{ + /** + * If called after init() but before recovery, will discard the database + * and reinitialize using an empty store dir. If @a pushDownStoreFiles + * is true, the content of the store dir will be moved to a backup dir + * inside the store dir. This is used when cluster nodes recover and must + * get thier content from a cluster sync rather than directly fromt the + * store. + * + * @param pushDownStoreFiles If true, will move content of the store dir + * into a subdir, leaving the store dir + * otherwise empty. + */ + virtual void truncateInit(const bool pushDownStoreFiles = false); + + /** + * Record the existence of a durable queue + */ + virtual void create(PersistableQueue& queue, + const qpid::framing::FieldTable& args); + /** + * Destroy a durable queue + */ + virtual void destroy(PersistableQueue& queue); + + /** + * Record the existence of a durable exchange + */ + virtual void create(const PersistableExchange& exchange, + const qpid::framing::FieldTable& args); + /** + * Destroy a durable exchange + */ + virtual void destroy(const PersistableExchange& exchange); + + /** + * Record a binding + */ + virtual void bind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args); + + /** + * Forget a binding + */ + virtual void unbind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args); + + /** + * Record generic durable configuration + */ + virtual void create(const PersistableConfig& config); + + /** + * Destroy generic durable configuration + */ + virtual void destroy(const PersistableConfig& config); + + /** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ + virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg); + + /** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ + virtual void destroy(PersistableMessage& msg); + + /** + * Appends content to a previously staged message + */ + virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, + const std::string& data); + + /** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ + virtual void loadContent(const qpid::broker::PersistableQueue& queue, + const boost::intrusive_ptr<const PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length); + + /** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param msg the message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void enqueue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); + + /** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param msg the message to dequeue + * @param queue the name of the queue from which it is to be dequeued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); + + /** + * Flushes all async messages to disk for the specified queue + * + * Note: this is a no-op for this provider. + * + * @param queue the name of the queue from which it is to be dequeued + */ + virtual void flush(const PersistableQueue& queue) {}; + + /** + * Returns the number of outstanding AIO's for a given queue + * + * If 0, than all the enqueue / dequeues have been stored + * to disk + * + * @param queue the name of the queue to check for outstanding AIO + */ + virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) + {return 0;} + //@} + + /** + * @name Methods inherited from qpid::broker::TransactionalStore + */ + //@{ + virtual std::auto_ptr<qpid::broker::TransactionContext> begin(); + virtual std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid); + virtual void prepare(qpid::broker::TPCTransactionContext& txn); + virtual void commit(qpid::broker::TransactionContext& txn); + virtual void abort(qpid::broker::TransactionContext& txn); + + // @TODO This maybe should not be in TransactionalStore + virtual void collectPreparedXids(std::set<std::string>& xids) {} + //@} + + virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer); + virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer, + ExchangeMap& exchangeMap); + virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer, + QueueMap& queueMap); + virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer, + const ExchangeMap& exchangeMap); + virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, + MessageMap& messageMap, + MessageQueueMap& messageQueueMap); + +private: + struct ProviderOptions : public qpid::Options + { + std::string connectString; + std::string catalogName; + + ProviderOptions(const std::string &name) + : qpid::Options(name), + catalogName("QpidStore") + { + const enum { NAMELEN = MAX_COMPUTERNAME_LENGTH + 1 }; + TCHAR myName[NAMELEN]; + DWORD myNameLen = NAMELEN; + GetComputerName(myName, &myNameLen); + connectString = "Data Source="; + connectString += myName; + connectString += "\\SQLEXPRESS;Integrated Security=SSPI"; + addOptions() + ("connect", + qpid::optValue(connectString, "STRING"), + "Connection string for the database to use. Will prepend " + "Provider=SQLOLEDB;") + ("catalog", + qpid::optValue(catalogName, "DB NAME"), + "Catalog (database) name") + ; + } + }; + ProviderOptions options; + + // Each thread has a separate connection to the database and also needs + // to manage its COM initialize/finalize individually. This is done by + // keeping a thread-specific State. + boost::thread_specific_ptr<State> dbState; + + State *initState(); + DatabaseConnection *initConnection(void); + void createDb(_ConnectionPtr conn, const std::string &name); +}; + +static MSSqlProvider static_instance_registers_plugin; + +void +MSSqlProvider::finalizeMe() +{ + dbState.reset(); +} + +MSSqlProvider::MSSqlProvider() + : options("MS SQL Provider options") +{ +} + +MSSqlProvider::~MSSqlProvider() +{ +} + +void +MSSqlProvider::earlyInitialize(Plugin::Target &target) +{ + MessageStorePlugin *store = dynamic_cast<MessageStorePlugin *>(&target); + if (store) { + // If the database init fails, report it and don't register; give + // the rest of the broker a chance to run. + // + // Don't try to initConnection() since that will fail if the + // database doesn't exist. Instead, try to open a connection without + // a database name, then search for the database. There's still a + // chance this provider won't be selected for the store too, so be + // be sure to close the database connection before return to avoid + // leaving a connection up that will not be used. + try { + std::auto_ptr<DatabaseConnection> db(new DatabaseConnection()); + db->open(options.connectString, ""); + _ConnectionPtr conn(*db); + _RecordsetPtr pCatalogs = NULL; + VariantHelper<std::string> catalogName(options.catalogName); + pCatalogs = conn->OpenSchema(adSchemaCatalogs, catalogName); + if (pCatalogs->EndOfFile) { + // Database doesn't exist; create it + QPID_LOG(notice, + "MSSQL: Creating database " + options.catalogName); + createDb(conn, options.catalogName); + } + else { + QPID_LOG(notice, + "MSSQL: Database located: " + options.catalogName); + } + if (pCatalogs) { + if (pCatalogs->State == adStateOpen) + pCatalogs->Close(); + pCatalogs = 0; + } + db->close(); + store->providerAvailable("MSSQL", this); + } + catch (qpid::Exception &e) { + QPID_LOG(error, e.what()); + return; + } + store->addFinalizer(boost::bind(&MSSqlProvider::finalizeMe, this)); + } +} + +void +MSSqlProvider::initialize(Plugin::Target& target) +{ +} + +void +MSSqlProvider::activate(MessageStorePlugin &store) +{ + QPID_LOG(info, "MS SQL Provider is up"); +} + +void +MSSqlProvider::truncateInit(const bool pushDownStoreFiles) +{ +} + +void +MSSqlProvider::create(PersistableQueue& queue, + const qpid::framing::FieldTable& /*args needed for jrnl*/) +{ + DatabaseConnection *db = initConnection(); + try { + BlobRecordset rsQueues; + db->beginTransaction(); + rsQueues.open(db, TblQueue); + rsQueues.add(queue); + db->commitTransaction(); + } + catch(_com_error &e) { + db->rollbackTransaction(); + throw ADOException("Error creating queue " + queue.getName(), e); + } +} + +/** + * Destroy a durable queue + */ +void +MSSqlProvider::destroy(PersistableQueue& queue) +{ + DatabaseConnection *db = initConnection(); + try { + BlobRecordset rsQueues; + BindingRecordset rsBindings; + db->beginTransaction(); + rsQueues.open(db, TblQueue); + rsBindings.open(db, TblBinding); + rsQueues.remove(queue); + rsBindings.remove(queue.getName()); + db->commitTransaction(); + } + catch(_com_error &e) { + db->rollbackTransaction(); + throw ADOException("Error deleting queue " + queue.getName(), e); + } +} + +/** + * Record the existence of a durable exchange + */ +void +MSSqlProvider::create(const PersistableExchange& exchange, + const qpid::framing::FieldTable& args) +{ + DatabaseConnection *db = initConnection(); + try { + BlobRecordset rsExchanges; + db->beginTransaction(); + rsExchanges.open(db, TblExchange); + rsExchanges.add(exchange); + db->commitTransaction(); + } + catch(_com_error &e) { + db->rollbackTransaction(); + throw ADOException("Error creating exchange " + exchange.getName(), e); + } +} + +/** + * Destroy a durable exchange + */ +void +MSSqlProvider::destroy(const PersistableExchange& exchange) +{ + DatabaseConnection *db = initConnection(); + try { + BlobRecordset rsExchanges; + BindingRecordset rsBindings; + db->beginTransaction(); + rsExchanges.open(db, TblExchange); + rsBindings.open(db, TblBinding); + rsExchanges.remove(exchange); + rsBindings.remove(exchange.getPersistenceId()); + db->commitTransaction(); + } + catch(_com_error &e) { + db->rollbackTransaction(); + throw ADOException("Error deleting exchange " + exchange.getName(), e); + } +} + +/** + * Record a binding + */ +void +MSSqlProvider::bind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args) +{ + DatabaseConnection *db = initConnection(); + try { + BindingRecordset rsBindings; + db->beginTransaction(); + rsBindings.open(db, TblBinding); + rsBindings.add(exchange.getPersistenceId(), queue.getName(), key, args); + db->commitTransaction(); + } + catch(_com_error &e) { + db->rollbackTransaction(); + throw ADOException("Error binding exchange " + exchange.getName() + + " to queue " + queue.getName(), e); + } +} + +/** + * Forget a binding + */ +void +MSSqlProvider::unbind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args) +{ + DatabaseConnection *db = initConnection(); + try { + BindingRecordset rsBindings; + db->beginTransaction(); + rsBindings.open(db, TblBinding); + rsBindings.remove(exchange.getPersistenceId(), + queue.getName(), + key, + args); + db->commitTransaction(); + } + catch(_com_error &e) { + db->rollbackTransaction(); + throw ADOException("Error unbinding exchange " + exchange.getName() + + " from queue " + queue.getName(), e); + } +} + +/** + * Record generic durable configuration + */ +void +MSSqlProvider::create(const PersistableConfig& config) +{ + DatabaseConnection *db = initConnection(); + try { + BlobRecordset rsConfigs; + db->beginTransaction(); + rsConfigs.open(db, TblConfig); + rsConfigs.add(config); + db->commitTransaction(); + } + catch(_com_error &e) { + db->rollbackTransaction(); + throw ADOException("Error creating config " + config.getName(), e); + } +} + +/** + * Destroy generic durable configuration + */ +void +MSSqlProvider::destroy(const PersistableConfig& config) +{ + DatabaseConnection *db = initConnection(); + try { + BlobRecordset rsConfigs; + db->beginTransaction(); + rsConfigs.open(db, TblConfig); + rsConfigs.remove(config); + db->commitTransaction(); + } + catch(_com_error &e) { + db->rollbackTransaction(); + throw ADOException("Error deleting config " + config.getName(), e); + } +} + +/** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ +void +MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg) +{ + DatabaseConnection *db = initConnection(); + try { + MessageRecordset rsMessages; + db->beginTransaction(); + rsMessages.open(db, TblMessage); + rsMessages.add(msg); + db->commitTransaction(); + } + catch(_com_error &e) { + db->rollbackTransaction(); + throw ADOException("Error staging message", e); + } +} + +/** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ +void +MSSqlProvider::destroy(PersistableMessage& msg) +{ + DatabaseConnection *db = initConnection(); + try { + BlobRecordset rsMessages; + db->beginTransaction(); + rsMessages.open(db, TblMessage); + rsMessages.remove(msg); + db->commitTransaction(); + } + catch(_com_error &e) { + db->rollbackTransaction(); + throw ADOException("Error deleting message", e); + } +} + +/** + * Appends content to a previously staged message + */ +void +MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, + const std::string& data) +{ + DatabaseConnection *db = initConnection(); + try { + MessageRecordset rsMessages; + db->beginTransaction(); + rsMessages.open(db, TblMessage); + rsMessages.append(msg, data); + db->commitTransaction(); + } + catch(_com_error &e) { + db->rollbackTransaction(); + throw ADOException("Error appending to message", e); + } +} + +/** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ +void +MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/, + const boost::intrusive_ptr<const PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length) +{ + // SQL store keeps all messages in one table, so we don't need the + // queue reference. + DatabaseConnection *db = initConnection(); + try { + MessageRecordset rsMessages; + rsMessages.open(db, TblMessage); + rsMessages.loadContent(msg, data, offset, length); + } + catch(_com_error &e) { + throw ADOException("Error loading message content", e); + } +} + +/** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * @param ctxt The transaction context under which this enqueue happens. + * @param msg The message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + */ +void +MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) +{ + AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); + if (atxn == 0) + throw qpid::broker::InvalidTransactionContextException(); + (void)initState(); // Ensure this thread is initialized + try { + atxn->begin(); + } + catch(_com_error &e) { + throw ADOException("Error queuing message", e); + } + + try { + if (msg->getPersistenceId() == 0) { // Message itself not yet saved + MessageRecordset rsMessages; + rsMessages.open(atxn->dbConn(), TblMessage); + rsMessages.add(msg); + } + MessageMapRecordset rsMap; + rsMap.open(atxn->dbConn(), TblMessageMap); + rsMap.add(msg->getPersistenceId(), queue.getPersistenceId()); + atxn->commit(); + } + catch(_com_error &e) { + atxn->abort(); + throw ADOException("Error queuing message", e); + } +} + +/** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * @param ctxt The transaction context under which this dequeue happens. + * @param msg The message to dequeue + * @param queue The queue from which it is to be dequeued + */ +void +MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) +{ + AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); + if (atxn == 0) + throw qpid::broker::InvalidTransactionContextException(); + (void)initState(); // Ensure this thread is initialized + try { + atxn->begin(); + } + catch(_com_error &e) { + throw ADOException("Error queuing message", e); + } + try { + MessageMapRecordset rsMap; + rsMap.open(atxn->dbConn(), TblMessageMap); + bool more = rsMap.remove(msg->getPersistenceId(), + queue.getPersistenceId()); + if (!more) { + MessageRecordset rsMessages; + rsMessages.open(atxn->dbConn(), TblMessage); + rsMessages.remove(msg); + } + atxn->commit(); + } + catch(_com_error &e) { + atxn->abort(); + throw ADOException("Error dequeuing message", e); + } +} + +std::auto_ptr<qpid::broker::TransactionContext> +MSSqlProvider::begin() +{ + (void)initState(); // Ensure this thread is initialized + + // Transactions are associated with the Connection, so this transaction + // context needs its own connection. At the time of writing, single-phase + // transactions are dealt with completely on one thread, so we really + // could just use the thread-specific DatabaseConnection for this. + // However, that would introduce an ugly, hidden coupling, so play + // it safe and handle this just like a TPC transaction, which actually + // can be prepared and committed/aborted from different threads, + // making it a bad idea to try using the thread-local DatabaseConnection. + std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + db->open(options.connectString, options.catalogName); + std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db)); + tx->begin(); + std::auto_ptr<qpid::broker::TransactionContext> tc(tx); + return tc; +} + +std::auto_ptr<qpid::broker::TPCTransactionContext> +MSSqlProvider::begin(const std::string& xid) +{ + (void)initState(); // Ensure this thread is initialized + std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + db->open(options.connectString, options.catalogName); + std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid)); + tx->begin(); + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx); + return tc; +} + +void +MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn) +{ +} + +void +MSSqlProvider::commit(qpid::broker::TransactionContext& txn) +{ + (void)initState(); // Ensure this thread is initialized + AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn); + if (atxn == 0) + throw qpid::broker::InvalidTransactionContextException(); + atxn->commit(); +} + +void +MSSqlProvider::abort(qpid::broker::TransactionContext& txn) +{ + (void)initState(); // Ensure this thread is initialized + AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn); + if (atxn == 0) + throw qpid::broker::InvalidTransactionContextException(); + atxn->abort(); +} + +// @TODO Much of this recovery code is way too similar... refactor to +// a recover template method on BlobRecordset. + +void +MSSqlProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsConfigs; + rsConfigs.open(db, TblConfig); + _RecordsetPtr p = (_RecordsetPtr)rsConfigs; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Config instance and reset its ID. + broker::RecoverableConfig::shared_ptr config = + recoverer.recoverConfig(blob); + config->setPersistenceId(id); + p->MoveNext(); + } +} + +void +MSSqlProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer, + ExchangeMap& exchangeMap) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsExchanges; + rsExchanges.open(db, TblExchange); + _RecordsetPtr p = (_RecordsetPtr)rsExchanges; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Exchange instance, reset its ID, and remember the + // ones restored for matching up when recovering bindings. + broker::RecoverableExchange::shared_ptr exchange = + recoverer.recoverExchange(blob); + exchange->setPersistenceId(id); + exchangeMap[id] = exchange; + p->MoveNext(); + } +} + +void +MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer, + QueueMap& queueMap) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; + rsQueues.open(db, TblQueue); + _RecordsetPtr p = (_RecordsetPtr)rsQueues; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Queue instance and reset its ID. + broker::RecoverableQueue::shared_ptr queue = + recoverer.recoverQueue(blob); + queue->setPersistenceId(id); + queueMap[id] = queue; + p->MoveNext(); + } +} + +void +MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer, + const ExchangeMap& exchangeMap) +{ + DatabaseConnection *db = initConnection(); + BindingRecordset rsBindings; + rsBindings.open(db, TblBinding); + rsBindings.recover(recoverer, exchangeMap); +} + +void +MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer, + MessageMap& messageMap, + MessageQueueMap& messageQueueMap) +{ + DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; + rsMessages.open(db, TblMessage); + rsMessages.recover(recoverer, messageMap); + + MessageMapRecordset rsMessageMaps; + rsMessageMaps.open(db, TblMessageMap); + rsMessageMaps.recover(messageQueueMap); +} + +////////////// Internal Methods + +State * +MSSqlProvider::initState() +{ + State *state = dbState.get(); // See if thread has initialized + if (!state) { + state = new State; + dbState.reset(state); + } + return state; +} + +DatabaseConnection * +MSSqlProvider::initConnection(void) +{ + State *state = initState(); + if (state->dbConn != 0) + return state->dbConn; // And the DatabaseConnection is set up too + std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + db->open(options.connectString, options.catalogName); + state->dbConn = db.release(); + return state->dbConn; +} + +void +MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) +{ + const std::string dbCmd = "CREATE DATABASE " + name; + const std::string useCmd = "USE " + name; + const std::string tableCmd = "CREATE TABLE "; + const std::string colSpecs = + " (persistenceId bigint PRIMARY KEY NOT NULL IDENTITY(1,1)," + " fieldTableBlob varbinary(MAX) NOT NULL)"; + const std::string bindingSpecs = + " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL," + " queueName varchar(255) NOT NULL," + " routingKey varchar(255)," + " fieldTableBlob varbinary(MAX))"; + const std::string messageMapSpecs = + " (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL," + " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL)"; + _variant_t unused; + _bstr_t dbStr = dbCmd.c_str(); + conn->Execute(dbStr, &unused, adExecuteNoRecords); + _bstr_t useStr = useCmd.c_str(); + conn->Execute(useStr, &unused, adExecuteNoRecords); + std::string makeTable = tableCmd + TblQueue + colSpecs; + _bstr_t makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblExchange + colSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblConfig + colSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblMessage + colSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblBinding + bindingSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblMessageMap + messageMapSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); +} + +void +MSSqlProvider::dump() +{ + // dump all db records to qpid_log + QPID_LOG(notice, "DB Dump: (not dumping anything)"); + // rsQueues.dump(); +} + + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp new file mode 100644 index 0000000000..7072015864 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp @@ -0,0 +1,115 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Exception.h> +#include <qpid/log/Statement.h> +#include <qpid/store/StorageProvider.h> + +#include "MessageMapRecordset.h" +#include "VariantHelper.h" + +namespace qpid { +namespace store { +namespace ms_sql { + +void +MessageMapRecordset::add(uint64_t messageId, uint64_t queueId) +{ + rs->AddNew(); + rs->Fields->GetItem("messageId")->Value = messageId; + rs->Fields->GetItem("queueId")->Value = queueId; + rs->Update(); +} + +bool +MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId) +{ + // Look up all mappings for the specified message. Then scan + // for the specified queue and keep track of whether or not the + // message exists on any queue we are not looking for a well. + std::ostringstream filter; + filter << "messageId = " << messageId << std::ends; + rs->PutFilter (VariantHelper<std::string>(filter.str())); + MessageMap m; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&m); + bool moreEntries = false, deleted = false; + // If the desired mapping gets deleted, and we already know there are + // other mappings for the message, don't bother finishing the scan. + while (!rs->EndOfFile && !(deleted && moreEntries)) { + if (m.queueId == queueId) { + rs->Delete(adAffectCurrent); + rs->Update(); + deleted = true; + } + else { + moreEntries = true; + } + rs->MoveNext(); + } + piAdoRecordBinding->Release(); + return moreEntries; +} + +void +MessageMapRecordset::recover(MessageQueueMap& msgMap) +{ + if (rs->BOF && rs->EndOfFile) + return; // Nothing to do + rs->MoveFirst(); + MessageMap b; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&b); + while (!rs->EndOfFile) { + msgMap[b.messageId].push_back(b.queueId); + rs->MoveNext(); + } + + piAdoRecordBinding->Release(); +} + +void +MessageMapRecordset::dump() +{ + Recordset::dump(); + if (rs->EndOfFile && rs->BOF) // No records + return; + rs->MoveFirst(); + + MessageMap m; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&m); + + while (!rs->EndOfFile) { + QPID_LOG(notice, "msg " << m.messageId << " on queue " << m.queueId); + rs->MoveNext(); + } + + piAdoRecordBinding->Release(); +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h new file mode 100644 index 0000000000..a2e91abb96 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h @@ -0,0 +1,69 @@ +#ifndef QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H +#define QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <icrsint.h> +#include "Recordset.h" +#include <qpid/broker/RecoveryManager.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class MessageMapRecordset + * + * Class for the message map (message -> queue) records. + */ +class MessageMapRecordset : public Recordset { + + class MessageMap : public CADORecordBinding { + BEGIN_ADO_BINDING(MessageMap) + ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE) + ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, queueId, FALSE) + END_ADO_BINDING() + + public: + uint64_t messageId; + uint64_t queueId; + }; + +public: + // Add a new mapping + void add(uint64_t messageId, uint64_t queueId); + + // Remove a specific mapping. Returns true if the message is still + // enqueued on at least one other queue; false if the message no longer + // exists on any other queues. + bool remove(uint64_t messageId, uint64_t queueId); + + // Recover the mappings of message ID -> vector<queue ID>. + void recover(MessageQueueMap& msgMap); + + // Dump table contents; useful for debugging. + void dump(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp new file mode 100644 index 0000000000..a29b97fa8a --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Exception.h> +#include <qpid/log/Statement.h> + +#include "MessageRecordset.h" +#include "BlobAdapter.h" +#include "BlobEncoder.h" +#include "VariantHelper.h" + +#include <boost/intrusive_ptr.hpp> + +class qpid::broker::PersistableMessage; + +namespace qpid { +namespace store { +namespace ms_sql { + +void +MessageRecordset::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg) +{ + BlobEncoder blob (msg); // Marshall headers and content to a blob + rs->AddNew(); + rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob); + rs->Update(); +} + +void +MessageRecordset::append(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, + const std::string& data) +{ + // Look up the message by its Id + std::ostringstream filter; + filter << "persistenceId = " << msg->getPersistenceId() << std::ends; + rs->PutFilter (VariantHelper<std::string>(filter.str())); + if (rs->RecordCount == 0) { + throw Exception("Can't append to message not stored in database"); + } + BlobEncoder blob (data); // Marshall string data to a blob + rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob); + rs->Update(); +} + +void +MessageRecordset::remove(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg) +{ + BlobRecordset::remove(msg->getPersistenceId()); +} + +void +MessageRecordset::loadContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length) +{ + // Look up the message by its Id + std::ostringstream filter; + filter << "persistenceId = " << msg->getPersistenceId() << std::ends; + rs->PutFilter (VariantHelper<std::string>(filter.str())); + if (rs->RecordCount == 0) { + throw Exception("Can't load message not stored in database"); + } + + // NOTE! If this code needs to change, please verify the encoding + // code in BlobEncoder. + long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize; + uint32_t headerSize; + const size_t headerFieldLength = sizeof(headerSize); + BlobAdapter blob(headerFieldLength); + blob = + rs->Fields->Item["fieldTableBlob"]->GetChunk((long)headerFieldLength); + headerSize = ((qpid::framing::Buffer&)blob).getLong(); + + // GetChunk always begins reading where the previous GetChunk left off, + // so we can't just tell it to ignore the header and read the data. + // So, read the header plus the offset, plus the desired data, then + // copy the desired data to the supplied string. If this ends up asking + // for more than is available in the field, reduce it to what's there. + long getSize = headerSize + offset + length; + if (getSize + (long)headerFieldLength > blobSize) { + size_t reduce = (getSize + headerFieldLength) - blobSize; + getSize -= reduce; + length -= reduce; + } + BlobAdapter header_plus(getSize); + header_plus = rs->Fields->Item["fieldTableBlob"]->GetChunk(getSize); + uint8_t *throw_away = new uint8_t[headerSize + offset]; + ((qpid::framing::Buffer&)header_plus).getRawData(throw_away, headerSize + offset); + delete throw_away; + ((qpid::framing::Buffer&)header_plus).getRawData(data, length); +} + +void +MessageRecordset::recover(qpid::broker::RecoveryManager& recoverer, + std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap) +{ + if (rs->BOF && rs->EndOfFile) + return; // Nothing to do + rs->MoveFirst(); + Binding b; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&b); + while (!rs->EndOfFile) { + // The blob was written as normal, but with the header length + // prepended in a uint32_t. Due to message staging threshold + // limits, the header may be all that's read in; get it first, + // recover that message header, then see if the rest is needed. + // + // NOTE! If this code needs to change, please verify the encoding + // code in BlobEncoder. + long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize; + uint32_t headerSize; + const size_t headerFieldLength = sizeof(headerSize); + BlobAdapter blob(headerFieldLength); + blob = + rs->Fields->Item["fieldTableBlob"]->GetChunk((long)headerFieldLength); + headerSize = ((qpid::framing::Buffer&)blob).getLong(); + BlobAdapter header(headerSize); + header = rs->Fields->Item["fieldTableBlob"]->GetChunk(headerSize); + broker::RecoverableMessage::shared_ptr msg; + msg = recoverer.recoverMessage(header); + msg->setPersistenceId(b.messageId); + messageMap[b.messageId] = msg; + + // Now, do we need the rest of the content? + long contentLength = blobSize - headerFieldLength - headerSize; + if (msg->loadContent(contentLength)) { + BlobAdapter content(contentLength); + content = + rs->Fields->Item["fieldTableBlob"]->GetChunk(contentLength); + msg->decodeContent(content); + } + rs->MoveNext(); + } + + piAdoRecordBinding->Release(); +} + +void +MessageRecordset::dump() +{ + Recordset::dump(); + if (rs->EndOfFile && rs->BOF) // No records + return; + rs->MoveFirst(); + + Binding b; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&b); + + while (VARIANT_FALSE == rs->EndOfFile) { + QPID_LOG(notice, "Msg " << b.messageId); + rs->MoveNext(); + } + + piAdoRecordBinding->Release(); +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h new file mode 100644 index 0000000000..f1835f6f73 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h @@ -0,0 +1,85 @@ +#ifndef QPID_STORE_MSSQL_MESSAGERECORDSET_H +#define QPID_STORE_MSSQL_MESSAGERECORDSET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <icrsint.h> +#include "BlobRecordset.h" +#include <qpid/broker/PersistableMessage.h> +#include <qpid/broker/RecoveryManager.h> +#include <boost/intrusive_ptr.hpp> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class MessageRecordset + * + * Class for storing and recovering messages. Messages are primarily blobs + * and handled similarly. However, messages larger than the staging threshold + * are not contained completely in memory; they're left mostly in the store + * and the header is held in memory. So when the message "blob" is saved, + * an additional size-of-the-header field is prepended to the blob. + * On recovery, the size-of-the-header is used to get only what's needed + * until it's determined if the entire message is to be recovered to memory. + */ +class MessageRecordset : public BlobRecordset { + class Binding : public CADORecordBinding { + BEGIN_ADO_BINDING(Binding) + ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE) + END_ADO_BINDING() + + public: + uint64_t messageId; + }; + +public: + // Store a message. Store the header size (4 bytes) then the regular + // blob comprising the message. + void add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg); + + // Append additional content to an existing message. + void append(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, + const std::string& data); + + // Remove an existing message + void remove(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg); + + // Load all or part of a stored message. This skips the header parts and + // loads content. + void loadContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length); + + // Recover messages and save a map of those recovered. + void recover(qpid::broker::RecoveryManager& recoverer, + std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap); + + // Dump table contents; useful for debugging. + void dump(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_MESSAGERECORDSET_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp b/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp new file mode 100644 index 0000000000..49232ababa --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Exception.h> +#include <qpid/log/Statement.h> + +#include "Recordset.h" +#include "BlobEncoder.h" +#include "DatabaseConnection.h" +#include "VariantHelper.h" + +namespace { +inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);}; +} + +namespace qpid { +namespace store { +namespace ms_sql { + +#if 0 +Recordset::Iterator::Iterator(Recordset& _rs) : rs(_rs) +{ + rs->MoveFirst(); + setCurrent(); +} + +std::pair<uint64_t, BlobAdapter>& +Recordset::Iterator::dereference() const +{ + return const_cast<std::pair<uint64_t, BlobAdapter> >(current); +} + +void +Recordset::Iterator::increment() +{ + rs->MoveNext(); + setCurrent(); +} + +bool +Recordset::Iterator::equal(const Iterator& x) const +{ + return current.first == x.current.first; +} + +void +Recordset::Iterator::setCurrent() +{ + if (!rs->EndOfFile) { + uint64_t id = rs->Fields->Item["persistenceId"]->Value; + long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + current = std::make_pair(id, blob); + } + else { + current.first = 0; + } +} +#endif + +void +Recordset::open(DatabaseConnection* conn, const std::string& table) +{ + _ConnectionPtr p = *conn; + TESTHR(rs.CreateInstance(__uuidof(::Recordset))); + rs->Open(table.c_str(), + _variant_t((IDispatch *)p, true), + adOpenKeyset, + adLockOptimistic, + adCmdTable); + tableName = table; +} + +void +Recordset::close() +{ + if (rs && rs->State == adStateOpen) + rs->Close(); + rs = 0; +} + +void +Recordset::requery() +{ + // Restore the recordset to reflect all current records. + rs->Filter = ""; + rs->Requery(-1); +} + +void +Recordset::dump() +{ + long count = rs->RecordCount; + QPID_LOG(notice, "DB Dump: " + tableName << + ": " << count << " records"); +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/Recordset.h b/qpid/cpp/src/qpid/store/ms-sql/Recordset.h new file mode 100644 index 0000000000..3631838aa8 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/Recordset.h @@ -0,0 +1,101 @@ +#ifndef QPID_STORE_MSSQL_RECORDSET_H +#define QPID_STORE_MSSQL_RECORDSET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +// Bring in ADO 2.8 (yes, I know it says "15", but that's it...) +#import "C:\Program Files\Common Files\System\ado\msado15.dll" \ + no_namespace rename("EOF", "EndOfFile") +#include <comdef.h> +#include <comutil.h> +#include <string> +#if 0 +#include <utility> +#endif + +namespace qpid { +namespace store { +namespace ms_sql { + +class DatabaseConnection; + +/** + * @class Recordset + * + * Represents an ADO Recordset, abstracting out the common operations needed + * on the common tables used that have 2 fields, persistence ID and blob. + */ +class Recordset { +protected: + _RecordsetPtr rs; + DatabaseConnection* dbConn; + std::string tableName; + +public: + +#if 0 + /** + * Iterator support for walking through the recordset. + * If I need to try this again, I'd look at Recordset cloning. + */ + class Iterator : public boost::iterator_facade< + Iterator, std::pair<uint64_t, BlobAdapter>, boost::random_access_traversal_tag> + { + public: + Iterator() : rs(0) { } + Iterator(Recordset& _rs); + + private: + friend class boost::iterator_core_access; + + std::pair<uint64_t, BlobAdapter>& dereference() const; + void increment(); + bool equal(const Iterator& x) const; + + _RecordsetPtr rs; + std::pair<uint64_t, BlobAdapter> current; + + void setCurrent(); + }; + + friend class Iterator; +#endif + + Recordset() : rs(0) {} + virtual ~Recordset() { close(); } + void open(DatabaseConnection* conn, const std::string& table); + void close(); + void requery(); + operator _RecordsetPtr () { return rs; } +#if 0 + Iterator begin() { Iterator iter(*this); return iter; } + Iterator end() { Iterator iter; return iter; } +#endif + + // Dump table contents; useful for debugging. + void dump(); +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_RECORDSET_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/State.cpp b/qpid/cpp/src/qpid/store/ms-sql/State.cpp new file mode 100644 index 0000000000..720603dd11 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/State.cpp @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "State.h" +#include "DatabaseConnection.h" +#include "Exception.h" +#include <comdef.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +State::State() : dbConn(0) +{ + HRESULT hr = ::CoInitializeEx(NULL, COINIT_MULTITHREADED); + if (hr != S_OK && hr != S_FALSE) + throw Exception("Error initializing COM"); +} + +State::~State() +{ + if (dbConn) + delete dbConn; + ::CoUninitialize(); +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/State.h b/qpid/cpp/src/qpid/store/ms-sql/State.h new file mode 100644 index 0000000000..6350bc5bd2 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/State.h @@ -0,0 +1,52 @@ +#ifndef QPID_STORE_MSSQL_STATE_H +#define QPID_STORE_MSSQL_STATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qpid { +namespace store { +namespace ms_sql { + +class DatabaseConnection; + +/** + * @struct State + * + * Represents a thread's state for accessing ADO and the database. + * Creating an instance of State initializes COM for this thread, and + * destroying it uninitializes COM. There's also a DatabaseConnection + * for this thread's default access to the database. More DatabaseConnections + * can always be created, but State has one that can always be used by + * the thread whose state is represented. + * + * This class is intended to be one-per-thread, so it should be accessed + * via thread-specific storage. + */ +struct State { + State(); + ~State(); + DatabaseConnection *dbConn; +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_STATE_H */ diff --git a/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp new file mode 100644 index 0000000000..786724f031 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "VariantHelper.h" + +namespace qpid { +namespace store { +namespace ms_sql { + +template <class Wrapped> +VariantHelper<Wrapped>::VariantHelper() +{ + var.vt = VT_EMPTY; +} + +template <class Wrapped> +VariantHelper<Wrapped>::operator const _variant_t& () const +{ + return var; +} + +// Specialization for using _variant_t to wrap a std::string +VariantHelper<std::string>::VariantHelper(const std::string &init) +{ + var.SetString(init.c_str()); +} + +VariantHelper<std::string>& +VariantHelper<std::string>::operator=(const std::string &rhs) +{ + var.SetString(rhs.c_str()); + return *this; +} + +VariantHelper<std::string>::operator const _variant_t& () const +{ + return var; +} + +}}} // namespace qpid::store::ms_sql diff --git a/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h new file mode 100644 index 0000000000..723dbc3b76 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h @@ -0,0 +1,61 @@ +#ifndef QPID_STORE_MSSQL_VARIANTHELPER_H +#define QPID_STORE_MSSQL_VARIANTHELPER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <comutil.h> + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class VariantHelper + * + * Class template to wrap the details of working with _variant_t objects. + */ +template <class Wrapped> class VariantHelper { +private: + _variant_t var; + +public: + VariantHelper(); + VariantHelper(const Wrapped &init); + + VariantHelper& operator =(const Wrapped& rhs); + operator const _variant_t& () const; +}; + +// Specialization for using _variant_t to wrap a std::string +template<> class VariantHelper<std::string> { +private: + _variant_t var; + +public: + VariantHelper(const std::string &init); + VariantHelper& operator =(const std::string& rhs); + operator const _variant_t& () const; +}; + +}}} // namespace qpid::store::ms_sql + +#endif /* QPID_STORE_MSSQL_VARIANTHELPER_H */ |