summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp')
-rw-r--r--qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp1305
1 files changed, 1305 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
new file mode 100644
index 0000000000..7f22db3d02
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
@@ -0,0 +1,1305 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdlib.h>
+#include <string>
+#include <windows.h>
+#include <qpid/broker/RecoverableQueue.h>
+#include <qpid/log/Statement.h>
+#include <qpid/store/MessageStorePlugin.h>
+#include <qpid/store/StorageProvider.h>
+#include "AmqpTransaction.h"
+#include "BlobAdapter.h"
+#include "BlobRecordset.h"
+#include "BindingRecordset.h"
+#include "MessageMapRecordset.h"
+#include "MessageRecordset.h"
+#include "TplRecordset.h"
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include "State.h"
+#include "VariantHelper.h"
+
+// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
+#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
+ no_namespace rename("EOF", "EndOfFile")
+#include <comdef.h>
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+
+// Table names
+const std::string TblBinding("tblBinding");
+const std::string TblConfig("tblConfig");
+const std::string TblExchange("tblExchange");
+const std::string TblMessage("tblMessage");
+const std::string TblMessageMap("tblMessageMap");
+const std::string TblQueue("tblQueue");
+const std::string TblTpl("tblTPL");
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class MSSqlProvider
+ *
+ * Implements a qpid::store::StorageProvider that uses Microsoft SQL Server as
+ * the backend data store for Qpid.
+ */
+class MSSqlProvider : public qpid::store::StorageProvider
+{
+protected:
+ void finalizeMe();
+
+ void dump();
+
+public:
+ MSSqlProvider();
+ ~MSSqlProvider();
+
+ virtual qpid::Options* getOptions() { return &options; }
+
+ virtual void earlyInitialize (Plugin::Target& target);
+ virtual void initialize(Plugin::Target& target);
+
+ /**
+ * Receive notification that this provider is the one that will actively
+ * handle provider storage for the target. If the provider is to be used,
+ * this method will be called after earlyInitialize() and before any
+ * recovery operations (recovery, in turn, precedes call to initialize()).
+ */
+ virtual void activate(MessageStorePlugin &store);
+
+ /**
+ * @name Methods inherited from qpid::broker::MessageStore
+ */
+ //@{
+ /**
+ * If called after init() but before recovery, will discard the database
+ * and reinitialize using an empty store dir. If @a pushDownStoreFiles
+ * is true, the content of the store dir will be moved to a backup dir
+ * inside the store dir. This is used when cluster nodes recover and must
+ * get thier content from a cluster sync rather than directly fromt the
+ * store.
+ *
+ * @param pushDownStoreFiles If true, will move content of the store dir
+ * into a subdir, leaving the store dir
+ * otherwise empty.
+ */
+ virtual void truncateInit(const bool pushDownStoreFiles = false);
+
+ /**
+ * Record the existence of a durable queue
+ */
+ virtual void create(PersistableQueue& queue,
+ const qpid::framing::FieldTable& args);
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(PersistableQueue& queue);
+
+ /**
+ * Record the existence of a durable exchange
+ */
+ virtual void create(const PersistableExchange& exchange,
+ const qpid::framing::FieldTable& args);
+ /**
+ * Destroy a durable exchange
+ */
+ virtual void destroy(const PersistableExchange& exchange);
+
+ /**
+ * Record a binding
+ */
+ virtual void bind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+
+ /**
+ * Forget a binding
+ */
+ virtual void unbind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+
+ /**
+ * Record generic durable configuration
+ */
+ virtual void create(const PersistableConfig& config);
+
+ /**
+ * Destroy generic durable configuration
+ */
+ virtual void destroy(const PersistableConfig& config);
+
+ /**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+ virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg);
+
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(PersistableMessage& msg);
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data);
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+ virtual void loadContent(const qpid::broker::PersistableQueue& queue,
+ const boost::intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
+
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of the queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
+
+ /**
+ * Flushes all async messages to disk for the specified queue
+ *
+ * Note: this is a no-op for this provider.
+ *
+ * @param queue the name of the queue from which it is to be dequeued
+ */
+ virtual void flush(const PersistableQueue& queue) {};
+
+ /**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk
+ *
+ * @param queue the name of the queue to check for outstanding AIO
+ */
+ virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue)
+ {return 0;}
+ //@}
+
+ /**
+ * @name Methods inherited from qpid::broker::TransactionalStore
+ */
+ //@{
+ virtual std::auto_ptr<qpid::broker::TransactionContext> begin();
+ virtual std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+ virtual void prepare(qpid::broker::TPCTransactionContext& txn);
+ virtual void commit(qpid::broker::TransactionContext& txn);
+ virtual void abort(qpid::broker::TransactionContext& txn);
+ virtual void collectPreparedXids(std::set<std::string>& xids);
+ //@}
+
+ virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer);
+ virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+ ExchangeMap& exchangeMap);
+ virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
+ QueueMap& queueMap);
+ virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap);
+ virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
+ MessageMap& messageMap,
+ MessageQueueMap& messageQueueMap);
+ virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap);
+
+private:
+ struct ProviderOptions : public qpid::Options
+ {
+ std::string connectString;
+ std::string catalogName;
+
+ ProviderOptions(const std::string &name)
+ : qpid::Options(name),
+ catalogName("QpidStore")
+ {
+ const enum { NAMELEN = MAX_COMPUTERNAME_LENGTH + 1 };
+ TCHAR myName[NAMELEN];
+ DWORD myNameLen = NAMELEN;
+ GetComputerName(myName, &myNameLen);
+ connectString = "Data Source=";
+ connectString += myName;
+ connectString += "\\SQLEXPRESS;Integrated Security=SSPI";
+ addOptions()
+ ("connect",
+ qpid::optValue(connectString, "STRING"),
+ "Connection string for the database to use. Will prepend "
+ "Provider=SQLOLEDB;")
+ ("catalog",
+ qpid::optValue(catalogName, "DB NAME"),
+ "Catalog (database) name")
+ ;
+ }
+ };
+ ProviderOptions options;
+
+ // Each thread has a separate connection to the database and also needs
+ // to manage its COM initialize/finalize individually. This is done by
+ // keeping a thread-specific State.
+ boost::thread_specific_ptr<State> dbState;
+
+ State *initState();
+ DatabaseConnection *initConnection(void);
+ void createDb(DatabaseConnection *db, const std::string &name);
+};
+
+static MSSqlProvider static_instance_registers_plugin;
+
+void
+MSSqlProvider::finalizeMe()
+{
+ dbState.reset();
+}
+
+MSSqlProvider::MSSqlProvider()
+ : options("MS SQL Provider options")
+{
+}
+
+MSSqlProvider::~MSSqlProvider()
+{
+}
+
+void
+MSSqlProvider::earlyInitialize(Plugin::Target &target)
+{
+ MessageStorePlugin *store = dynamic_cast<MessageStorePlugin *>(&target);
+ if (store) {
+ // If the database init fails, report it and don't register; give
+ // the rest of the broker a chance to run.
+ //
+ // Don't try to initConnection() since that will fail if the
+ // database doesn't exist. Instead, try to open a connection without
+ // a database name, then search for the database. There's still a
+ // chance this provider won't be selected for the store too, so be
+ // be sure to close the database connection before return to avoid
+ // leaving a connection up that will not be used.
+ try {
+ initState(); // This initializes COM
+ std::auto_ptr<DatabaseConnection> db(new DatabaseConnection());
+ db->open(options.connectString, "");
+ _ConnectionPtr conn(*db);
+ _RecordsetPtr pCatalogs = NULL;
+ VariantHelper<std::string> catalogName(options.catalogName);
+ pCatalogs = conn->OpenSchema(adSchemaCatalogs, catalogName);
+ if (pCatalogs->EndOfFile) {
+ // Database doesn't exist; create it
+ QPID_LOG(notice,
+ "MSSQL: Creating database " + options.catalogName);
+ createDb(db.get(), options.catalogName);
+ }
+ else {
+ QPID_LOG(notice,
+ "MSSQL: Database located: " + options.catalogName);
+ }
+ if (pCatalogs) {
+ if (pCatalogs->State == adStateOpen)
+ pCatalogs->Close();
+ pCatalogs = 0;
+ }
+ db->close();
+ store->providerAvailable("MSSQL", this);
+ }
+ catch (qpid::Exception &e) {
+ QPID_LOG(error, e.what());
+ return;
+ }
+ store->addFinalizer(boost::bind(&MSSqlProvider::finalizeMe, this));
+ }
+}
+
+void
+MSSqlProvider::initialize(Plugin::Target& target)
+{
+}
+
+void
+MSSqlProvider::activate(MessageStorePlugin &store)
+{
+ QPID_LOG(info, "MS SQL Provider is up");
+}
+
+void
+MSSqlProvider::truncateInit(const bool pushDownStoreFiles)
+{
+}
+
+void
+MSSqlProvider::create(PersistableQueue& queue,
+ const qpid::framing::FieldTable& /*args needed for jrnl*/)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ try {
+ db->beginTransaction();
+ rsQueues.open(db, TblQueue);
+ rsQueues.add(queue);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error creating queue " + queue.getName(), e, errs);
+ }
+}
+
+/**
+ * Destroy a durable queue
+ */
+void
+MSSqlProvider::destroy(PersistableQueue& queue)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ BindingRecordset rsBindings;
+ MessageRecordset rsMessages;
+ MessageMapRecordset rsMessageMaps;
+ try {
+ db->beginTransaction();
+ rsQueues.open(db, TblQueue);
+ rsBindings.open(db, TblBinding);
+ rsMessages.open(db, TblMessage);
+ rsMessageMaps.open(db, TblMessageMap);
+ // Remove bindings first; the queue IDs can't be ripped out from
+ // under the references in the bindings table. Then remove the
+ // message->queue entries for the queue, also because the queue can't
+ // be deleted while there are references to it. If there are messages
+ // orphaned by removing the queue references, they're deleted by
+ // a trigger on the tblMessageMap table. Lastly, the queue record
+ // can be removed.
+ rsBindings.removeForQueue(queue.getPersistenceId());
+ rsMessageMaps.removeForQueue(queue.getPersistenceId());
+ rsQueues.remove(queue);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting queue " + queue.getName(), e, errs);
+ }
+}
+
+/**
+ * Record the existence of a durable exchange
+ */
+void
+MSSqlProvider::create(const PersistableExchange& exchange,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ try {
+ db->beginTransaction();
+ rsExchanges.open(db, TblExchange);
+ rsExchanges.add(exchange);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error creating exchange " + exchange.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Destroy a durable exchange
+ */
+void
+MSSqlProvider::destroy(const PersistableExchange& exchange)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsExchanges.open(db, TblExchange);
+ rsBindings.open(db, TblBinding);
+ // Remove bindings first; the exchange IDs can't be ripped out from
+ // under the references in the bindings table.
+ rsBindings.removeForExchange(exchange.getPersistenceId());
+ rsExchanges.remove(exchange);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting exchange " + exchange.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Record a binding
+ */
+void
+MSSqlProvider::bind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsBindings.open(db, TblBinding);
+ rsBindings.add(exchange.getPersistenceId(),
+ queue.getPersistenceId(),
+ key,
+ args);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error binding exchange " + exchange.getName() +
+ " to queue " + queue.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Forget a binding
+ */
+void
+MSSqlProvider::unbind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsBindings.open(db, TblBinding);
+ rsBindings.remove(exchange.getPersistenceId(),
+ queue.getPersistenceId(),
+ key,
+ args);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error unbinding exchange " + exchange.getName() +
+ " from queue " + queue.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Record generic durable configuration
+ */
+void
+MSSqlProvider::create(const PersistableConfig& config)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
+ try {
+ db->beginTransaction();
+ rsConfigs.open(db, TblConfig);
+ rsConfigs.add(config);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error creating config " + config.getName(), e, errs);
+ }
+}
+
+/**
+ * Destroy generic durable configuration
+ */
+void
+MSSqlProvider::destroy(const PersistableConfig& config)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
+ try {
+ db->beginTransaction();
+ rsConfigs.open(db, TblConfig);
+ rsConfigs.remove(config);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting config " + config.getName(), e, errs);
+ }
+}
+
+/**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+void
+MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
+{
+ DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
+ try {
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.add(msg);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error staging message", e, errs);
+ }
+}
+
+/**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+void
+MSSqlProvider::destroy(PersistableMessage& msg)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsMessages;
+ try {
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.remove(msg);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting message", e, errs);
+ }
+}
+
+/**
+ * Appends content to a previously staged message
+ */
+void
+MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data)
+{
+ DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
+ try {
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.append(msg, data);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error appending to message", e, errs);
+ }
+}
+
+/**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+void
+MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
+ const boost::intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+ // SQL store keeps all messages in one table, so we don't need the
+ // queue reference.
+ DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
+ try {
+ rsMessages.open(db, TblMessage);
+ rsMessages.loadContent(msg, data, offset, length);
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ throw ADOException("Error loading message content", e, errs);
+ }
+}
+
+/**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * @param ctxt The transaction context under which this enqueue happens.
+ * @param msg The message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ */
+void
+MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
+{
+ // If this enqueue is in the context of a transaction, use the specified
+ // transaction to nest a new transaction for this operation. However, if
+ // this is not in the context of a transaction, then just use the thread's
+ // DatabaseConnection with a ADO transaction.
+ DatabaseConnection *db = 0;
+ std::string xid;
+ AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
+ if (atxn == 0) {
+ db = initConnection();
+ db->beginTransaction();
+ }
+ else {
+ (void)initState(); // Ensure this thread is initialized
+ // It's a transactional enqueue; if it's TPC, grab the xid.
+ AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt);
+ if (tpcTxn)
+ xid = tpcTxn->getXid();
+ db = atxn->dbConn();
+ try {
+ atxn->sqlBegin();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error queuing message", e, db->getErrors());
+ }
+ }
+
+ MessageRecordset rsMessages;
+ MessageMapRecordset rsMap;
+ try {
+ if (msg->getPersistenceId() == 0) { // Message itself not yet saved
+ rsMessages.open(db, TblMessage);
+ rsMessages.add(msg);
+ }
+ rsMap.open(db, TblMessageMap);
+ rsMap.add(msg->getPersistenceId(), queue.getPersistenceId(), xid);
+ if (atxn)
+ atxn->sqlCommit();
+ else
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ if (atxn)
+ atxn->sqlAbort();
+ else
+ db->rollbackTransaction();
+ throw ADOException("Error queuing message", e, errs);
+ }
+ msg->enqueueComplete();
+}
+
+/**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * @param ctxt The transaction context under which this dequeue happens.
+ * @param msg The message to dequeue
+ * @param queue The queue from which it is to be dequeued
+ */
+void
+MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
+{
+ // If this dequeue is in the context of a transaction, use the specified
+ // transaction to nest a new transaction for this operation. However, if
+ // this is not in the context of a transaction, then just use the thread's
+ // DatabaseConnection with a ADO transaction.
+ DatabaseConnection *db = 0;
+ std::string xid;
+ AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
+ if (atxn == 0) {
+ db = initConnection();
+ db->beginTransaction();
+ }
+ else {
+ (void)initState(); // Ensure this thread is initialized
+ // It's a transactional dequeue; if it's TPC, grab the xid.
+ AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt);
+ if (tpcTxn)
+ xid = tpcTxn->getXid();
+ db = atxn->dbConn();
+ try {
+ atxn->sqlBegin();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error queuing message", e, db->getErrors());
+ }
+ }
+
+ MessageMapRecordset rsMap;
+ try {
+ rsMap.open(db, TblMessageMap);
+ // TPC dequeues are just marked pending and will actually be removed
+ // when the transaction commits; Single-phase dequeues are removed
+ // now, relying on the SQL transaction to put it back if the
+ // transaction doesn't commit.
+ if (!xid.empty()) {
+ rsMap.pendingRemove(msg->getPersistenceId(),
+ queue.getPersistenceId(),
+ xid);
+ }
+ else {
+ rsMap.remove(msg->getPersistenceId(),
+ queue.getPersistenceId());
+ }
+ if (atxn)
+ atxn->sqlCommit();
+ else
+ db->commitTransaction();
+ }
+ catch(ms_sql::Exception&) {
+ if (atxn)
+ atxn->sqlAbort();
+ else
+ db->rollbackTransaction();
+ throw;
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ if (atxn)
+ atxn->sqlAbort();
+ else
+ db->rollbackTransaction();
+ throw ADOException("Error dequeuing message", e, errs);
+ }
+ msg->dequeueComplete();
+}
+
+std::auto_ptr<qpid::broker::TransactionContext>
+MSSqlProvider::begin()
+{
+ (void)initState(); // Ensure this thread is initialized
+
+ // Transactions are associated with the Connection, so this transaction
+ // context needs its own connection. At the time of writing, single-phase
+ // transactions are dealt with completely on one thread, so we really
+ // could just use the thread-specific DatabaseConnection for this.
+ // However, that would introduce an ugly, hidden coupling, so play
+ // it safe and handle this just like a TPC transaction, which actually
+ // can be prepared and committed/aborted from different threads,
+ // making it a bad idea to try using the thread-local DatabaseConnection.
+ boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection);
+ db->open(options.connectString, options.catalogName);
+ std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db));
+ tx->sqlBegin();
+ std::auto_ptr<qpid::broker::TransactionContext> tc(tx);
+ return tc;
+}
+
+std::auto_ptr<qpid::broker::TPCTransactionContext>
+MSSqlProvider::begin(const std::string& xid)
+{
+ (void)initState(); // Ensure this thread is initialized
+ boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection);
+ db->open(options.connectString, options.catalogName);
+ std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid));
+ tx->sqlBegin();
+
+ TplRecordset rsTpl;
+ try {
+ tx->sqlBegin();
+ rsTpl.open(db.get(), TblTpl);
+ rsTpl.add(xid);
+ tx->sqlCommit();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ tx->sqlAbort();
+ throw ADOException("Error adding TPL record", e, errs);
+ }
+
+ std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
+ return tc;
+}
+
+void
+MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn)
+{
+ // Commit all the marked-up enqueue/dequeue ops and the TPL record.
+ // On commit/rollback the TPL will be removed and the TPL markups
+ // on the message map will be cleaned up as well.
+ (void)initState(); // Ensure this thread is initialized
+ AmqpTPCTransaction *atxn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+ if (atxn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ try {
+ atxn->sqlCommit();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error preparing", e, atxn->dbConn()->getErrors());
+ }
+ atxn->setPrepared();
+}
+
+void
+MSSqlProvider::commit(qpid::broker::TransactionContext& txn)
+{
+ (void)initState(); // Ensure this thread is initialized
+ /*
+ * One-phase transactions simply commit the outer SQL transaction
+ * that was begun on begin(). Two-phase transactions are different -
+ * the SQL transaction started on begin() was committed on prepare()
+ * so all the SQL records reflecting the enqueue/dequeue actions for
+ * the transaction are recorded but with xid markups on them to reflect
+ * that they are prepared but not committed. Now go back and remove
+ * the markups, deleting those marked for removal.
+ */
+ AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+ if (p2txn == 0) {
+ AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn);
+ if (p1txn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ p1txn->sqlCommit();
+ return;
+ }
+
+ DatabaseConnection *db(p2txn->dbConn());
+ TplRecordset rsTpl;
+ MessageMapRecordset rsMessageMap;
+ try {
+ db->beginTransaction();
+ rsTpl.open(db, TblTpl);
+ rsMessageMap.open(db, TblMessageMap);
+ rsMessageMap.commitPrepared(p2txn->getXid());
+ rsTpl.remove(p2txn->getXid());
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error committing transaction", e, errs);
+ }
+}
+
+void
+MSSqlProvider::abort(qpid::broker::TransactionContext& txn)
+{
+ (void)initState(); // Ensure this thread is initialized
+ /*
+ * One-phase and non-prepared two-phase transactions simply abort
+ * the outer SQL transaction that was begun on begin(). However, prepared
+ * two-phase transactions are different - the SQL transaction started
+ * on begin() was committed on prepare() so all the SQL records
+ * reflecting the enqueue/dequeue actions for the transaction are
+ * recorded but with xid markups on them to reflect that they are
+ * prepared but not committed. Now go back and remove the markups,
+ * deleting those marked for addition.
+ */
+ AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+ if (p2txn == 0 || !p2txn->isPrepared()) {
+ AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn);
+ if (p1txn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ p1txn->sqlAbort();
+ return;
+ }
+
+ DatabaseConnection *db(p2txn->dbConn());
+ TplRecordset rsTpl;
+ MessageMapRecordset rsMessageMap;
+ try {
+ db->beginTransaction();
+ rsTpl.open(db, TblTpl);
+ rsMessageMap.open(db, TblMessageMap);
+ rsMessageMap.abortPrepared(p2txn->getXid());
+ rsTpl.remove(p2txn->getXid());
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error committing transaction", e, errs);
+ }
+
+
+ (void)initState(); // Ensure this thread is initialized
+ AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn);
+ if (atxn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ atxn->sqlAbort();
+}
+
+void
+MSSqlProvider::collectPreparedXids(std::set<std::string>& xids)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ TplRecordset rsTpl;
+ rsTpl.open(db, TblTpl);
+ rsTpl.recover(xids);
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error reading TPL", e, db->getErrors());
+ }
+}
+
+// @TODO Much of this recovery code is way too similar... refactor to
+// a recover template method on BlobRecordset.
+
+void
+MSSqlProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer)
+{
+ DatabaseConnection *db = 0;
+ try {
+ db = initConnection();
+ BlobRecordset rsConfigs;
+ rsConfigs.open(db, TblConfig);
+ _RecordsetPtr p = (_RecordsetPtr)rsConfigs;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Config instance and reset its ID.
+ broker::RecoverableConfig::shared_ptr config =
+ recoverer.recoverConfig(blob);
+ config->setPersistenceId(id);
+ p->MoveNext();
+ }
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering configs",
+ e,
+ db ? db->getErrors() : "");
+ }
+}
+
+void
+MSSqlProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+ ExchangeMap& exchangeMap)
+{
+ DatabaseConnection *db = 0;
+ try {
+ db = initConnection();
+ BlobRecordset rsExchanges;
+ rsExchanges.open(db, TblExchange);
+ _RecordsetPtr p = (_RecordsetPtr)rsExchanges;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Exchange instance, reset its ID, and remember the
+ // ones restored for matching up when recovering bindings.
+ broker::RecoverableExchange::shared_ptr exchange =
+ recoverer.recoverExchange(blob);
+ exchange->setPersistenceId(id);
+ exchangeMap[id] = exchange;
+ p->MoveNext();
+ }
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering exchanges",
+ e,
+ db ? db->getErrors() : "");
+ }
+}
+
+void
+MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer,
+ QueueMap& queueMap)
+{
+ DatabaseConnection *db = 0;
+ try {
+ db = initConnection();
+ BlobRecordset rsQueues;
+ rsQueues.open(db, TblQueue);
+ _RecordsetPtr p = (_RecordsetPtr)rsQueues;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Queue instance and reset its ID.
+ broker::RecoverableQueue::shared_ptr queue =
+ recoverer.recoverQueue(blob);
+ queue->setPersistenceId(id);
+ queueMap[id] = queue;
+ p->MoveNext();
+ }
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering queues",
+ e,
+ db ? db->getErrors() : "");
+ }
+}
+
+void
+MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer,
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap)
+{
+ DatabaseConnection *db = 0;
+ try {
+ db = initConnection();
+ BindingRecordset rsBindings;
+ rsBindings.open(db, TblBinding);
+ rsBindings.recover(recoverer, exchangeMap, queueMap);
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering bindings",
+ e,
+ db ? db->getErrors() : "");
+ }
+}
+
+void
+MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer,
+ MessageMap& messageMap,
+ MessageQueueMap& messageQueueMap)
+{
+ DatabaseConnection *db = 0;
+ try {
+ db = initConnection();
+ MessageRecordset rsMessages;
+ rsMessages.open(db, TblMessage);
+ rsMessages.recover(recoverer, messageMap);
+
+ MessageMapRecordset rsMessageMaps;
+ rsMessageMaps.open(db, TblMessageMap);
+ rsMessageMaps.recover(messageQueueMap);
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering messages",
+ e,
+ db ? db->getErrors() : "");
+ }
+}
+
+void
+MSSqlProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap)
+{
+ DatabaseConnection *db = initConnection();
+ std::set<std::string> xids;
+ try {
+ TplRecordset rsTpl;
+ rsTpl.open(db, TblTpl);
+ rsTpl.recover(xids);
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering TPL records", e, db->getErrors());
+ }
+
+ try {
+ // Rebuild the needed RecoverableTransactions.
+ for (std::set<std::string>::const_iterator iXid = xids.begin();
+ iXid != xids.end();
+ ++iXid) {
+ boost::shared_ptr<DatabaseConnection> dbX(new DatabaseConnection);
+ dbX->open(options.connectString, options.catalogName);
+ std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(dbX,
+ *iXid));
+ tx->setPrepared();
+ std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
+ dtxMap[*iXid] = recoverer.recoverTransaction(*iXid, tc);
+ }
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recreating dtx connection", e);
+ }
+}
+
+////////////// Internal Methods
+
+State *
+MSSqlProvider::initState()
+{
+ State *state = dbState.get(); // See if thread has initialized
+ if (!state) {
+ state = new State;
+ dbState.reset(state);
+ }
+ return state;
+}
+
+DatabaseConnection *
+MSSqlProvider::initConnection(void)
+{
+ State *state = initState();
+ if (state->dbConn != 0)
+ return state->dbConn; // And the DatabaseConnection is set up too
+ std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+ db->open(options.connectString, options.catalogName);
+ state->dbConn = db.release();
+ return state->dbConn;
+}
+
+void
+MSSqlProvider::createDb(DatabaseConnection *db, const std::string &name)
+{
+ const std::string dbCmd = "CREATE DATABASE " + name;
+ const std::string useCmd = "USE " + name;
+ const std::string tableCmd = "CREATE TABLE ";
+ const std::string colSpecs =
+ " (persistenceId bigint PRIMARY KEY NOT NULL IDENTITY(1,1),"
+ " fieldTableBlob varbinary(MAX) NOT NULL)";
+ const std::string bindingSpecs =
+ " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL,"
+ " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
+ " routingKey varchar(255),"
+ " fieldTableBlob varbinary(MAX))";
+ const std::string messageMapSpecs =
+ " (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL,"
+ " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
+ " prepareStatus tinyint CHECK (prepareStatus IS NULL OR "
+ " prepareStatus IN (1, 2)),"
+ " xid varbinary(512) REFERENCES tblTPL(xid)"
+ " CONSTRAINT CK_NoDups UNIQUE NONCLUSTERED (messageId, queueId) )";
+ const std::string tplSpecs = " (xid varbinary(512) PRIMARY KEY NOT NULL)";
+ // SET NOCOUNT ON added to prevent extra result sets from
+ // interfering with SELECT statements. (Added by SQL Management)
+ const std::string removeUnrefMsgsTrigger =
+ "CREATE TRIGGER dbo.RemoveUnreferencedMessages "
+ "ON tblMessageMap AFTER DELETE AS BEGIN "
+ "SET NOCOUNT ON; "
+ "DELETE FROM tblMessage "
+ "WHERE tblMessage.persistenceId IN "
+ " (SELECT messageId FROM deleted) AND"
+ " NOT EXISTS(SELECT * FROM tblMessageMap"
+ " WHERE tblMessageMap.messageId IN"
+ " (SELECT messageId FROM deleted)) "
+ "END";
+
+ _variant_t unused;
+ _bstr_t dbStr = dbCmd.c_str();
+ _ConnectionPtr conn(*db);
+ try {
+ conn->Execute(dbStr, &unused, adExecuteNoRecords);
+ _bstr_t useStr = useCmd.c_str();
+ conn->Execute(useStr, &unused, adExecuteNoRecords);
+ std::string makeTable = tableCmd + TblQueue + colSpecs;
+ _bstr_t makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblExchange + colSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblConfig + colSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblMessage + colSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblBinding + bindingSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblTpl + tplSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblMessageMap + messageMapSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ _bstr_t addTriggerStr = removeUnrefMsgsTrigger.c_str();
+ conn->Execute(addTriggerStr, &unused, adExecuteNoRecords);
+ }
+ catch(_com_error &e) {
+ throw ADOException("MSSQL can't create " + name, e, db->getErrors());
+ }
+}
+
+void
+MSSqlProvider::dump()
+{
+ // dump all db records to qpid_log
+ QPID_LOG(notice, "DB Dump: (not dumping anything)");
+ // rsQueues.dump();
+}
+
+
+}}} // namespace qpid::store::ms_sql