diff options
Diffstat (limited to 'qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp | 1305 |
1 files changed, 1305 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp new file mode 100644 index 0000000000..7f22db3d02 --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp @@ -0,0 +1,1305 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <stdlib.h> +#include <string> +#include <windows.h> +#include <qpid/broker/RecoverableQueue.h> +#include <qpid/log/Statement.h> +#include <qpid/store/MessageStorePlugin.h> +#include <qpid/store/StorageProvider.h> +#include "AmqpTransaction.h" +#include "BlobAdapter.h" +#include "BlobRecordset.h" +#include "BindingRecordset.h" +#include "MessageMapRecordset.h" +#include "MessageRecordset.h" +#include "TplRecordset.h" +#include "DatabaseConnection.h" +#include "Exception.h" +#include "State.h" +#include "VariantHelper.h" + +// Bring in ADO 2.8 (yes, I know it says "15", but that's it...) +#import "C:\Program Files\Common Files\System\ado\msado15.dll" \ + no_namespace rename("EOF", "EndOfFile") +#include <comdef.h> +namespace { +inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);}; + +// Table names +const std::string TblBinding("tblBinding"); +const std::string TblConfig("tblConfig"); +const std::string TblExchange("tblExchange"); +const std::string TblMessage("tblMessage"); +const std::string TblMessageMap("tblMessageMap"); +const std::string TblQueue("tblQueue"); +const std::string TblTpl("tblTPL"); +} + +namespace qpid { +namespace store { +namespace ms_sql { + +/** + * @class MSSqlProvider + * + * Implements a qpid::store::StorageProvider that uses Microsoft SQL Server as + * the backend data store for Qpid. + */ +class MSSqlProvider : public qpid::store::StorageProvider +{ +protected: + void finalizeMe(); + + void dump(); + +public: + MSSqlProvider(); + ~MSSqlProvider(); + + virtual qpid::Options* getOptions() { return &options; } + + virtual void earlyInitialize (Plugin::Target& target); + virtual void initialize(Plugin::Target& target); + + /** + * Receive notification that this provider is the one that will actively + * handle provider storage for the target. If the provider is to be used, + * this method will be called after earlyInitialize() and before any + * recovery operations (recovery, in turn, precedes call to initialize()). + */ + virtual void activate(MessageStorePlugin &store); + + /** + * @name Methods inherited from qpid::broker::MessageStore + */ + //@{ + /** + * If called after init() but before recovery, will discard the database + * and reinitialize using an empty store dir. If @a pushDownStoreFiles + * is true, the content of the store dir will be moved to a backup dir + * inside the store dir. This is used when cluster nodes recover and must + * get thier content from a cluster sync rather than directly fromt the + * store. + * + * @param pushDownStoreFiles If true, will move content of the store dir + * into a subdir, leaving the store dir + * otherwise empty. + */ + virtual void truncateInit(const bool pushDownStoreFiles = false); + + /** + * Record the existence of a durable queue + */ + virtual void create(PersistableQueue& queue, + const qpid::framing::FieldTable& args); + /** + * Destroy a durable queue + */ + virtual void destroy(PersistableQueue& queue); + + /** + * Record the existence of a durable exchange + */ + virtual void create(const PersistableExchange& exchange, + const qpid::framing::FieldTable& args); + /** + * Destroy a durable exchange + */ + virtual void destroy(const PersistableExchange& exchange); + + /** + * Record a binding + */ + virtual void bind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args); + + /** + * Forget a binding + */ + virtual void unbind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args); + + /** + * Record generic durable configuration + */ + virtual void create(const PersistableConfig& config); + + /** + * Destroy generic durable configuration + */ + virtual void destroy(const PersistableConfig& config); + + /** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ + virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg); + + /** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ + virtual void destroy(PersistableMessage& msg); + + /** + * Appends content to a previously staged message + */ + virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, + const std::string& data); + + /** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ + virtual void loadContent(const qpid::broker::PersistableQueue& queue, + const boost::intrusive_ptr<const PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length); + + /** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param msg the message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void enqueue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); + + /** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. + * + * @param msg the message to dequeue + * @param queue the name of the queue from which it is to be dequeued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); + + /** + * Flushes all async messages to disk for the specified queue + * + * Note: this is a no-op for this provider. + * + * @param queue the name of the queue from which it is to be dequeued + */ + virtual void flush(const PersistableQueue& queue) {}; + + /** + * Returns the number of outstanding AIO's for a given queue + * + * If 0, than all the enqueue / dequeues have been stored + * to disk + * + * @param queue the name of the queue to check for outstanding AIO + */ + virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) + {return 0;} + //@} + + /** + * @name Methods inherited from qpid::broker::TransactionalStore + */ + //@{ + virtual std::auto_ptr<qpid::broker::TransactionContext> begin(); + virtual std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid); + virtual void prepare(qpid::broker::TPCTransactionContext& txn); + virtual void commit(qpid::broker::TransactionContext& txn); + virtual void abort(qpid::broker::TransactionContext& txn); + virtual void collectPreparedXids(std::set<std::string>& xids); + //@} + + virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer); + virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer, + ExchangeMap& exchangeMap); + virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer, + QueueMap& queueMap); + virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer, + const ExchangeMap& exchangeMap, + const QueueMap& queueMap); + virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, + MessageMap& messageMap, + MessageQueueMap& messageQueueMap); + virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap); + +private: + struct ProviderOptions : public qpid::Options + { + std::string connectString; + std::string catalogName; + + ProviderOptions(const std::string &name) + : qpid::Options(name), + catalogName("QpidStore") + { + const enum { NAMELEN = MAX_COMPUTERNAME_LENGTH + 1 }; + TCHAR myName[NAMELEN]; + DWORD myNameLen = NAMELEN; + GetComputerName(myName, &myNameLen); + connectString = "Data Source="; + connectString += myName; + connectString += "\\SQLEXPRESS;Integrated Security=SSPI"; + addOptions() + ("connect", + qpid::optValue(connectString, "STRING"), + "Connection string for the database to use. Will prepend " + "Provider=SQLOLEDB;") + ("catalog", + qpid::optValue(catalogName, "DB NAME"), + "Catalog (database) name") + ; + } + }; + ProviderOptions options; + + // Each thread has a separate connection to the database and also needs + // to manage its COM initialize/finalize individually. This is done by + // keeping a thread-specific State. + boost::thread_specific_ptr<State> dbState; + + State *initState(); + DatabaseConnection *initConnection(void); + void createDb(DatabaseConnection *db, const std::string &name); +}; + +static MSSqlProvider static_instance_registers_plugin; + +void +MSSqlProvider::finalizeMe() +{ + dbState.reset(); +} + +MSSqlProvider::MSSqlProvider() + : options("MS SQL Provider options") +{ +} + +MSSqlProvider::~MSSqlProvider() +{ +} + +void +MSSqlProvider::earlyInitialize(Plugin::Target &target) +{ + MessageStorePlugin *store = dynamic_cast<MessageStorePlugin *>(&target); + if (store) { + // If the database init fails, report it and don't register; give + // the rest of the broker a chance to run. + // + // Don't try to initConnection() since that will fail if the + // database doesn't exist. Instead, try to open a connection without + // a database name, then search for the database. There's still a + // chance this provider won't be selected for the store too, so be + // be sure to close the database connection before return to avoid + // leaving a connection up that will not be used. + try { + initState(); // This initializes COM + std::auto_ptr<DatabaseConnection> db(new DatabaseConnection()); + db->open(options.connectString, ""); + _ConnectionPtr conn(*db); + _RecordsetPtr pCatalogs = NULL; + VariantHelper<std::string> catalogName(options.catalogName); + pCatalogs = conn->OpenSchema(adSchemaCatalogs, catalogName); + if (pCatalogs->EndOfFile) { + // Database doesn't exist; create it + QPID_LOG(notice, + "MSSQL: Creating database " + options.catalogName); + createDb(db.get(), options.catalogName); + } + else { + QPID_LOG(notice, + "MSSQL: Database located: " + options.catalogName); + } + if (pCatalogs) { + if (pCatalogs->State == adStateOpen) + pCatalogs->Close(); + pCatalogs = 0; + } + db->close(); + store->providerAvailable("MSSQL", this); + } + catch (qpid::Exception &e) { + QPID_LOG(error, e.what()); + return; + } + store->addFinalizer(boost::bind(&MSSqlProvider::finalizeMe, this)); + } +} + +void +MSSqlProvider::initialize(Plugin::Target& target) +{ +} + +void +MSSqlProvider::activate(MessageStorePlugin &store) +{ + QPID_LOG(info, "MS SQL Provider is up"); +} + +void +MSSqlProvider::truncateInit(const bool pushDownStoreFiles) +{ +} + +void +MSSqlProvider::create(PersistableQueue& queue, + const qpid::framing::FieldTable& /*args needed for jrnl*/) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; + try { + db->beginTransaction(); + rsQueues.open(db, TblQueue); + rsQueues.add(queue); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error creating queue " + queue.getName(), e, errs); + } +} + +/** + * Destroy a durable queue + */ +void +MSSqlProvider::destroy(PersistableQueue& queue) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; + BindingRecordset rsBindings; + MessageRecordset rsMessages; + MessageMapRecordset rsMessageMaps; + try { + db->beginTransaction(); + rsQueues.open(db, TblQueue); + rsBindings.open(db, TblBinding); + rsMessages.open(db, TblMessage); + rsMessageMaps.open(db, TblMessageMap); + // Remove bindings first; the queue IDs can't be ripped out from + // under the references in the bindings table. Then remove the + // message->queue entries for the queue, also because the queue can't + // be deleted while there are references to it. If there are messages + // orphaned by removing the queue references, they're deleted by + // a trigger on the tblMessageMap table. Lastly, the queue record + // can be removed. + rsBindings.removeForQueue(queue.getPersistenceId()); + rsMessageMaps.removeForQueue(queue.getPersistenceId()); + rsQueues.remove(queue); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error deleting queue " + queue.getName(), e, errs); + } +} + +/** + * Record the existence of a durable exchange + */ +void +MSSqlProvider::create(const PersistableExchange& exchange, + const qpid::framing::FieldTable& args) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsExchanges; + try { + db->beginTransaction(); + rsExchanges.open(db, TblExchange); + rsExchanges.add(exchange); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error creating exchange " + exchange.getName(), + e, + errs); + } +} + +/** + * Destroy a durable exchange + */ +void +MSSqlProvider::destroy(const PersistableExchange& exchange) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsExchanges; + BindingRecordset rsBindings; + try { + db->beginTransaction(); + rsExchanges.open(db, TblExchange); + rsBindings.open(db, TblBinding); + // Remove bindings first; the exchange IDs can't be ripped out from + // under the references in the bindings table. + rsBindings.removeForExchange(exchange.getPersistenceId()); + rsExchanges.remove(exchange); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error deleting exchange " + exchange.getName(), + e, + errs); + } +} + +/** + * Record a binding + */ +void +MSSqlProvider::bind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args) +{ + DatabaseConnection *db = initConnection(); + BindingRecordset rsBindings; + try { + db->beginTransaction(); + rsBindings.open(db, TblBinding); + rsBindings.add(exchange.getPersistenceId(), + queue.getPersistenceId(), + key, + args); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error binding exchange " + exchange.getName() + + " to queue " + queue.getName(), + e, + errs); + } +} + +/** + * Forget a binding + */ +void +MSSqlProvider::unbind(const PersistableExchange& exchange, + const PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args) +{ + DatabaseConnection *db = initConnection(); + BindingRecordset rsBindings; + try { + db->beginTransaction(); + rsBindings.open(db, TblBinding); + rsBindings.remove(exchange.getPersistenceId(), + queue.getPersistenceId(), + key, + args); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error unbinding exchange " + exchange.getName() + + " from queue " + queue.getName(), + e, + errs); + } +} + +/** + * Record generic durable configuration + */ +void +MSSqlProvider::create(const PersistableConfig& config) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsConfigs; + try { + db->beginTransaction(); + rsConfigs.open(db, TblConfig); + rsConfigs.add(config); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error creating config " + config.getName(), e, errs); + } +} + +/** + * Destroy generic durable configuration + */ +void +MSSqlProvider::destroy(const PersistableConfig& config) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsConfigs; + try { + db->beginTransaction(); + rsConfigs.open(db, TblConfig); + rsConfigs.remove(config); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error deleting config " + config.getName(), e, errs); + } +} + +/** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ +void +MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg) +{ + DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; + try { + db->beginTransaction(); + rsMessages.open(db, TblMessage); + rsMessages.add(msg); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error staging message", e, errs); + } +} + +/** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ +void +MSSqlProvider::destroy(PersistableMessage& msg) +{ + DatabaseConnection *db = initConnection(); + BlobRecordset rsMessages; + try { + db->beginTransaction(); + rsMessages.open(db, TblMessage); + rsMessages.remove(msg); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error deleting message", e, errs); + } +} + +/** + * Appends content to a previously staged message + */ +void +MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, + const std::string& data) +{ + DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; + try { + db->beginTransaction(); + rsMessages.open(db, TblMessage); + rsMessages.append(msg, data); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error appending to message", e, errs); + } +} + +/** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ +void +MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/, + const boost::intrusive_ptr<const PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length) +{ + // SQL store keeps all messages in one table, so we don't need the + // queue reference. + DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; + try { + rsMessages.open(db, TblMessage); + rsMessages.loadContent(msg, data, offset, length); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + throw ADOException("Error loading message content", e, errs); + } +} + +/** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * @param ctxt The transaction context under which this enqueue happens. + * @param msg The message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + */ +void +MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) +{ + // If this enqueue is in the context of a transaction, use the specified + // transaction to nest a new transaction for this operation. However, if + // this is not in the context of a transaction, then just use the thread's + // DatabaseConnection with a ADO transaction. + DatabaseConnection *db = 0; + std::string xid; + AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); + if (atxn == 0) { + db = initConnection(); + db->beginTransaction(); + } + else { + (void)initState(); // Ensure this thread is initialized + // It's a transactional enqueue; if it's TPC, grab the xid. + AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt); + if (tpcTxn) + xid = tpcTxn->getXid(); + db = atxn->dbConn(); + try { + atxn->sqlBegin(); + } + catch(_com_error &e) { + throw ADOException("Error queuing message", e, db->getErrors()); + } + } + + MessageRecordset rsMessages; + MessageMapRecordset rsMap; + try { + if (msg->getPersistenceId() == 0) { // Message itself not yet saved + rsMessages.open(db, TblMessage); + rsMessages.add(msg); + } + rsMap.open(db, TblMessageMap); + rsMap.add(msg->getPersistenceId(), queue.getPersistenceId(), xid); + if (atxn) + atxn->sqlCommit(); + else + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + if (atxn) + atxn->sqlAbort(); + else + db->rollbackTransaction(); + throw ADOException("Error queuing message", e, errs); + } + msg->enqueueComplete(); +} + +/** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * @param ctxt The transaction context under which this dequeue happens. + * @param msg The message to dequeue + * @param queue The queue from which it is to be dequeued + */ +void +MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) +{ + // If this dequeue is in the context of a transaction, use the specified + // transaction to nest a new transaction for this operation. However, if + // this is not in the context of a transaction, then just use the thread's + // DatabaseConnection with a ADO transaction. + DatabaseConnection *db = 0; + std::string xid; + AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); + if (atxn == 0) { + db = initConnection(); + db->beginTransaction(); + } + else { + (void)initState(); // Ensure this thread is initialized + // It's a transactional dequeue; if it's TPC, grab the xid. + AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt); + if (tpcTxn) + xid = tpcTxn->getXid(); + db = atxn->dbConn(); + try { + atxn->sqlBegin(); + } + catch(_com_error &e) { + throw ADOException("Error queuing message", e, db->getErrors()); + } + } + + MessageMapRecordset rsMap; + try { + rsMap.open(db, TblMessageMap); + // TPC dequeues are just marked pending and will actually be removed + // when the transaction commits; Single-phase dequeues are removed + // now, relying on the SQL transaction to put it back if the + // transaction doesn't commit. + if (!xid.empty()) { + rsMap.pendingRemove(msg->getPersistenceId(), + queue.getPersistenceId(), + xid); + } + else { + rsMap.remove(msg->getPersistenceId(), + queue.getPersistenceId()); + } + if (atxn) + atxn->sqlCommit(); + else + db->commitTransaction(); + } + catch(ms_sql::Exception&) { + if (atxn) + atxn->sqlAbort(); + else + db->rollbackTransaction(); + throw; + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + if (atxn) + atxn->sqlAbort(); + else + db->rollbackTransaction(); + throw ADOException("Error dequeuing message", e, errs); + } + msg->dequeueComplete(); +} + +std::auto_ptr<qpid::broker::TransactionContext> +MSSqlProvider::begin() +{ + (void)initState(); // Ensure this thread is initialized + + // Transactions are associated with the Connection, so this transaction + // context needs its own connection. At the time of writing, single-phase + // transactions are dealt with completely on one thread, so we really + // could just use the thread-specific DatabaseConnection for this. + // However, that would introduce an ugly, hidden coupling, so play + // it safe and handle this just like a TPC transaction, which actually + // can be prepared and committed/aborted from different threads, + // making it a bad idea to try using the thread-local DatabaseConnection. + boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection); + db->open(options.connectString, options.catalogName); + std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db)); + tx->sqlBegin(); + std::auto_ptr<qpid::broker::TransactionContext> tc(tx); + return tc; +} + +std::auto_ptr<qpid::broker::TPCTransactionContext> +MSSqlProvider::begin(const std::string& xid) +{ + (void)initState(); // Ensure this thread is initialized + boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection); + db->open(options.connectString, options.catalogName); + std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid)); + tx->sqlBegin(); + + TplRecordset rsTpl; + try { + tx->sqlBegin(); + rsTpl.open(db.get(), TblTpl); + rsTpl.add(xid); + tx->sqlCommit(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + tx->sqlAbort(); + throw ADOException("Error adding TPL record", e, errs); + } + + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx); + return tc; +} + +void +MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn) +{ + // Commit all the marked-up enqueue/dequeue ops and the TPL record. + // On commit/rollback the TPL will be removed and the TPL markups + // on the message map will be cleaned up as well. + (void)initState(); // Ensure this thread is initialized + AmqpTPCTransaction *atxn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (atxn == 0) + throw qpid::broker::InvalidTransactionContextException(); + try { + atxn->sqlCommit(); + } + catch(_com_error &e) { + throw ADOException("Error preparing", e, atxn->dbConn()->getErrors()); + } + atxn->setPrepared(); +} + +void +MSSqlProvider::commit(qpid::broker::TransactionContext& txn) +{ + (void)initState(); // Ensure this thread is initialized + /* + * One-phase transactions simply commit the outer SQL transaction + * that was begun on begin(). Two-phase transactions are different - + * the SQL transaction started on begin() was committed on prepare() + * so all the SQL records reflecting the enqueue/dequeue actions for + * the transaction are recorded but with xid markups on them to reflect + * that they are prepared but not committed. Now go back and remove + * the markups, deleting those marked for removal. + */ + AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (p2txn == 0) { + AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn); + if (p1txn == 0) + throw qpid::broker::InvalidTransactionContextException(); + p1txn->sqlCommit(); + return; + } + + DatabaseConnection *db(p2txn->dbConn()); + TplRecordset rsTpl; + MessageMapRecordset rsMessageMap; + try { + db->beginTransaction(); + rsTpl.open(db, TblTpl); + rsMessageMap.open(db, TblMessageMap); + rsMessageMap.commitPrepared(p2txn->getXid()); + rsTpl.remove(p2txn->getXid()); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error committing transaction", e, errs); + } +} + +void +MSSqlProvider::abort(qpid::broker::TransactionContext& txn) +{ + (void)initState(); // Ensure this thread is initialized + /* + * One-phase and non-prepared two-phase transactions simply abort + * the outer SQL transaction that was begun on begin(). However, prepared + * two-phase transactions are different - the SQL transaction started + * on begin() was committed on prepare() so all the SQL records + * reflecting the enqueue/dequeue actions for the transaction are + * recorded but with xid markups on them to reflect that they are + * prepared but not committed. Now go back and remove the markups, + * deleting those marked for addition. + */ + AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (p2txn == 0 || !p2txn->isPrepared()) { + AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn); + if (p1txn == 0) + throw qpid::broker::InvalidTransactionContextException(); + p1txn->sqlAbort(); + return; + } + + DatabaseConnection *db(p2txn->dbConn()); + TplRecordset rsTpl; + MessageMapRecordset rsMessageMap; + try { + db->beginTransaction(); + rsTpl.open(db, TblTpl); + rsMessageMap.open(db, TblMessageMap); + rsMessageMap.abortPrepared(p2txn->getXid()); + rsTpl.remove(p2txn->getXid()); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error committing transaction", e, errs); + } + + + (void)initState(); // Ensure this thread is initialized + AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn); + if (atxn == 0) + throw qpid::broker::InvalidTransactionContextException(); + atxn->sqlAbort(); +} + +void +MSSqlProvider::collectPreparedXids(std::set<std::string>& xids) +{ + DatabaseConnection *db = initConnection(); + try { + TplRecordset rsTpl; + rsTpl.open(db, TblTpl); + rsTpl.recover(xids); + } + catch(_com_error &e) { + throw ADOException("Error reading TPL", e, db->getErrors()); + } +} + +// @TODO Much of this recovery code is way too similar... refactor to +// a recover template method on BlobRecordset. + +void +MSSqlProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer) +{ + DatabaseConnection *db = 0; + try { + db = initConnection(); + BlobRecordset rsConfigs; + rsConfigs.open(db, TblConfig); + _RecordsetPtr p = (_RecordsetPtr)rsConfigs; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Config instance and reset its ID. + broker::RecoverableConfig::shared_ptr config = + recoverer.recoverConfig(blob); + config->setPersistenceId(id); + p->MoveNext(); + } + } + catch(_com_error &e) { + throw ADOException("Error recovering configs", + e, + db ? db->getErrors() : ""); + } +} + +void +MSSqlProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer, + ExchangeMap& exchangeMap) +{ + DatabaseConnection *db = 0; + try { + db = initConnection(); + BlobRecordset rsExchanges; + rsExchanges.open(db, TblExchange); + _RecordsetPtr p = (_RecordsetPtr)rsExchanges; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Exchange instance, reset its ID, and remember the + // ones restored for matching up when recovering bindings. + broker::RecoverableExchange::shared_ptr exchange = + recoverer.recoverExchange(blob); + exchange->setPersistenceId(id); + exchangeMap[id] = exchange; + p->MoveNext(); + } + } + catch(_com_error &e) { + throw ADOException("Error recovering exchanges", + e, + db ? db->getErrors() : ""); + } +} + +void +MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer, + QueueMap& queueMap) +{ + DatabaseConnection *db = 0; + try { + db = initConnection(); + BlobRecordset rsQueues; + rsQueues.open(db, TblQueue); + _RecordsetPtr p = (_RecordsetPtr)rsQueues; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Queue instance and reset its ID. + broker::RecoverableQueue::shared_ptr queue = + recoverer.recoverQueue(blob); + queue->setPersistenceId(id); + queueMap[id] = queue; + p->MoveNext(); + } + } + catch(_com_error &e) { + throw ADOException("Error recovering queues", + e, + db ? db->getErrors() : ""); + } +} + +void +MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer, + const ExchangeMap& exchangeMap, + const QueueMap& queueMap) +{ + DatabaseConnection *db = 0; + try { + db = initConnection(); + BindingRecordset rsBindings; + rsBindings.open(db, TblBinding); + rsBindings.recover(recoverer, exchangeMap, queueMap); + } + catch(_com_error &e) { + throw ADOException("Error recovering bindings", + e, + db ? db->getErrors() : ""); + } +} + +void +MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer, + MessageMap& messageMap, + MessageQueueMap& messageQueueMap) +{ + DatabaseConnection *db = 0; + try { + db = initConnection(); + MessageRecordset rsMessages; + rsMessages.open(db, TblMessage); + rsMessages.recover(recoverer, messageMap); + + MessageMapRecordset rsMessageMaps; + rsMessageMaps.open(db, TblMessageMap); + rsMessageMaps.recover(messageQueueMap); + } + catch(_com_error &e) { + throw ADOException("Error recovering messages", + e, + db ? db->getErrors() : ""); + } +} + +void +MSSqlProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap) +{ + DatabaseConnection *db = initConnection(); + std::set<std::string> xids; + try { + TplRecordset rsTpl; + rsTpl.open(db, TblTpl); + rsTpl.recover(xids); + } + catch(_com_error &e) { + throw ADOException("Error recovering TPL records", e, db->getErrors()); + } + + try { + // Rebuild the needed RecoverableTransactions. + for (std::set<std::string>::const_iterator iXid = xids.begin(); + iXid != xids.end(); + ++iXid) { + boost::shared_ptr<DatabaseConnection> dbX(new DatabaseConnection); + dbX->open(options.connectString, options.catalogName); + std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(dbX, + *iXid)); + tx->setPrepared(); + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx); + dtxMap[*iXid] = recoverer.recoverTransaction(*iXid, tc); + } + } + catch(_com_error &e) { + throw ADOException("Error recreating dtx connection", e); + } +} + +////////////// Internal Methods + +State * +MSSqlProvider::initState() +{ + State *state = dbState.get(); // See if thread has initialized + if (!state) { + state = new State; + dbState.reset(state); + } + return state; +} + +DatabaseConnection * +MSSqlProvider::initConnection(void) +{ + State *state = initState(); + if (state->dbConn != 0) + return state->dbConn; // And the DatabaseConnection is set up too + std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + db->open(options.connectString, options.catalogName); + state->dbConn = db.release(); + return state->dbConn; +} + +void +MSSqlProvider::createDb(DatabaseConnection *db, const std::string &name) +{ + const std::string dbCmd = "CREATE DATABASE " + name; + const std::string useCmd = "USE " + name; + const std::string tableCmd = "CREATE TABLE "; + const std::string colSpecs = + " (persistenceId bigint PRIMARY KEY NOT NULL IDENTITY(1,1)," + " fieldTableBlob varbinary(MAX) NOT NULL)"; + const std::string bindingSpecs = + " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL," + " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL," + " routingKey varchar(255)," + " fieldTableBlob varbinary(MAX))"; + const std::string messageMapSpecs = + " (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL," + " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL," + " prepareStatus tinyint CHECK (prepareStatus IS NULL OR " + " prepareStatus IN (1, 2))," + " xid varbinary(512) REFERENCES tblTPL(xid)" + " CONSTRAINT CK_NoDups UNIQUE NONCLUSTERED (messageId, queueId) )"; + const std::string tplSpecs = " (xid varbinary(512) PRIMARY KEY NOT NULL)"; + // SET NOCOUNT ON added to prevent extra result sets from + // interfering with SELECT statements. (Added by SQL Management) + const std::string removeUnrefMsgsTrigger = + "CREATE TRIGGER dbo.RemoveUnreferencedMessages " + "ON tblMessageMap AFTER DELETE AS BEGIN " + "SET NOCOUNT ON; " + "DELETE FROM tblMessage " + "WHERE tblMessage.persistenceId IN " + " (SELECT messageId FROM deleted) AND" + " NOT EXISTS(SELECT * FROM tblMessageMap" + " WHERE tblMessageMap.messageId IN" + " (SELECT messageId FROM deleted)) " + "END"; + + _variant_t unused; + _bstr_t dbStr = dbCmd.c_str(); + _ConnectionPtr conn(*db); + try { + conn->Execute(dbStr, &unused, adExecuteNoRecords); + _bstr_t useStr = useCmd.c_str(); + conn->Execute(useStr, &unused, adExecuteNoRecords); + std::string makeTable = tableCmd + TblQueue + colSpecs; + _bstr_t makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblExchange + colSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblConfig + colSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblMessage + colSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblBinding + bindingSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblTpl + tplSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblMessageMap + messageMapSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + _bstr_t addTriggerStr = removeUnrefMsgsTrigger.c_str(); + conn->Execute(addTriggerStr, &unused, adExecuteNoRecords); + } + catch(_com_error &e) { + throw ADOException("MSSQL can't create " + name, e, db->getErrors()); + } +} + +void +MSSqlProvider::dump() +{ + // dump all db records to qpid_log + QPID_LOG(notice, "DB Dump: (not dumping anything)"); + // rsQueues.dump(); +} + + +}}} // namespace qpid::store::ms_sql |