/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include #include #include #include #include #include #include "AmqpTransaction.h" #include "BlobAdapter.h" #include "BlobRecordset.h" #include "BindingRecordset.h" #include "MessageMapRecordset.h" #include "MessageRecordset.h" #include "TplRecordset.h" #include "DatabaseConnection.h" #include "Exception.h" #include "State.h" #include "VariantHelper.h" // Bring in ADO 2.8 (yes, I know it says "15", but that's it...) #import "C:\Program Files\Common Files\System\ado\msado15.dll" \ no_namespace rename("EOF", "EndOfFile") #include namespace { inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);}; // Table names const std::string TblBinding("tblBinding"); const std::string TblConfig("tblConfig"); const std::string TblExchange("tblExchange"); const std::string TblMessage("tblMessage"); const std::string TblMessageMap("tblMessageMap"); const std::string TblQueue("tblQueue"); const std::string TblTpl("tblTPL"); } namespace qpid { namespace store { namespace ms_sql { /** * @class MSSqlProvider * * Implements a qpid::store::StorageProvider that uses Microsoft SQL Server as * the backend data store for Qpid. */ class MSSqlProvider : public qpid::store::StorageProvider { protected: void finalizeMe(); void dump(); public: MSSqlProvider(); ~MSSqlProvider(); virtual qpid::Options* getOptions() { return &options; } virtual void earlyInitialize (Plugin::Target& target); virtual void initialize(Plugin::Target& target); /** * Receive notification that this provider is the one that will actively * handle provider storage for the target. If the provider is to be used, * this method will be called after earlyInitialize() and before any * recovery operations (recovery, in turn, precedes call to initialize()). */ virtual void activate(MessageStorePlugin &store); /** * @name Methods inherited from qpid::broker::MessageStore */ /** * Record the existence of a durable queue */ virtual void create(PersistableQueue& queue, const qpid::framing::FieldTable& args); /** * Destroy a durable queue */ virtual void destroy(PersistableQueue& queue); /** * Record the existence of a durable exchange */ virtual void create(const PersistableExchange& exchange, const qpid::framing::FieldTable& args); /** * Destroy a durable exchange */ virtual void destroy(const PersistableExchange& exchange); /** * Record a binding */ virtual void bind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const qpid::framing::FieldTable& args); /** * Forget a binding */ virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const qpid::framing::FieldTable& args); /** * Record generic durable configuration */ virtual void create(const PersistableConfig& config); /** * Destroy generic durable configuration */ virtual void destroy(const PersistableConfig& config); /** * Stores a messages before it has been enqueued * (enqueueing automatically stores the message so this is * only required if storage is required prior to that * point). If the message has not yet been stored it will * store the headers as well as any content passed in. A * persistence id will be set on the message which can be * used to load the content or to append to it. */ virtual void stage(const boost::intrusive_ptr& msg); /** * Destroys a previously staged message. This only needs * to be called if the message is never enqueued. (Once * enqueued, deletion will be automatic when the message * is dequeued from all queues it was enqueued onto). */ virtual void destroy(PersistableMessage& msg); /** * Appends content to a previously staged message */ virtual void appendContent(const boost::intrusive_ptr& msg, const std::string& data); /** * Loads (a section) of content data for the specified * message (previously stored through a call to stage or * enqueue) into data. The offset refers to the content * only (i.e. an offset of 0 implies that the start of the * content should be loaded, not the headers or related * meta-data). */ virtual void loadContent(const qpid::broker::PersistableQueue& queue, const boost::intrusive_ptr& msg, std::string& data, uint64_t offset, uint32_t length); /** * Enqueues a message, storing the message if it has not * been previously stored and recording that the given * message is on the given queue. * * Note: that this is async so the return of the function does * not mean the opperation is complete. * * @param msg the message to enqueue * @param queue the name of the queue onto which it is to be enqueued * @param xid (a pointer to) an identifier of the * distributed transaction in which the operation takes * place or null for 'local' transactions */ virtual void enqueue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr& msg, const PersistableQueue& queue); /** * Dequeues a message, recording that the given message is * no longer on the given queue and deleting the message * if it is no longer on any other queue. * * Note: that this is async so the return of the function does * not mean the opperation is complete. * * @param msg the message to dequeue * @param queue the name of the queue from which it is to be dequeued * @param xid (a pointer to) an identifier of the * distributed transaction in which the operation takes * place or null for 'local' transactions */ virtual void dequeue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr& msg, const PersistableQueue& queue); /** * Flushes all async messages to disk for the specified queue * * Note: this is a no-op for this provider. * * @param queue the name of the queue from which it is to be dequeued */ virtual void flush(const PersistableQueue& queue) {}; /** * Returns the number of outstanding AIO's for a given queue * * If 0, than all the enqueue / dequeues have been stored * to disk * * @param queue the name of the queue to check for outstanding AIO */ virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) {return 0;} //@} /** * @name Methods inherited from qpid::broker::TransactionalStore */ //@{ virtual std::auto_ptr begin(); virtual std::auto_ptr begin(const std::string& xid); virtual void prepare(qpid::broker::TPCTransactionContext& txn); virtual void commit(qpid::broker::TransactionContext& txn); virtual void abort(qpid::broker::TransactionContext& txn); virtual void collectPreparedXids(std::set& xids); //@} virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer); virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer, ExchangeMap& exchangeMap); virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer, QueueMap& queueMap); virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer, const ExchangeMap& exchangeMap, const QueueMap& queueMap); virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, MessageMap& messageMap, MessageQueueMap& messageQueueMap); virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer, PreparedTransactionMap& dtxMap); private: struct ProviderOptions : public qpid::Options { std::string connectString; std::string catalogName; ProviderOptions(const std::string &name) : qpid::Options(name), catalogName("QpidStore") { const enum { NAMELEN = MAX_COMPUTERNAME_LENGTH + 1 }; TCHAR myName[NAMELEN]; DWORD myNameLen = NAMELEN; GetComputerName(myName, &myNameLen); connectString = "Data Source="; connectString += myName; connectString += "\\SQLEXPRESS;Integrated Security=SSPI"; addOptions() ("connect", qpid::optValue(connectString, "STRING"), "Connection string for the database to use. Will prepend " "Provider=SQLOLEDB;") ("catalog", qpid::optValue(catalogName, "DB NAME"), "Catalog (database) name") ; } }; ProviderOptions options; // Each thread has a separate connection to the database and also needs // to manage its COM initialize/finalize individually. This is done by // keeping a thread-specific State. boost::thread_specific_ptr dbState; State *initState(); DatabaseConnection *initConnection(void); void createDb(DatabaseConnection *db, const std::string &name); }; static MSSqlProvider static_instance_registers_plugin; void MSSqlProvider::finalizeMe() { dbState.reset(); } MSSqlProvider::MSSqlProvider() : options("MS SQL Provider options") { } MSSqlProvider::~MSSqlProvider() { } void MSSqlProvider::earlyInitialize(Plugin::Target &target) { MessageStorePlugin *store = dynamic_cast(&target); if (store) { // If the database init fails, report it and don't register; give // the rest of the broker a chance to run. // // Don't try to initConnection() since that will fail if the // database doesn't exist. Instead, try to open a connection without // a database name, then search for the database. There's still a // chance this provider won't be selected for the store too, so be // be sure to close the database connection before return to avoid // leaving a connection up that will not be used. try { initState(); // This initializes COM std::auto_ptr db(new DatabaseConnection()); db->open(options.connectString, ""); _ConnectionPtr conn(*db); _RecordsetPtr pCatalogs = NULL; VariantHelper catalogName(options.catalogName); pCatalogs = conn->OpenSchema(adSchemaCatalogs, catalogName); if (pCatalogs->EndOfFile) { // Database doesn't exist; create it QPID_LOG(notice, "MSSQL: Creating database " + options.catalogName); createDb(db.get(), options.catalogName); } else { QPID_LOG(notice, "MSSQL: Database located: " + options.catalogName); } if (pCatalogs) { if (pCatalogs->State == adStateOpen) pCatalogs->Close(); pCatalogs = 0; } db->close(); store->providerAvailable("MSSQL", this); } catch (qpid::Exception &e) { QPID_LOG(error, e.what()); return; } store->addFinalizer(boost::bind(&MSSqlProvider::finalizeMe, this)); } } void MSSqlProvider::initialize(Plugin::Target& target) { } void MSSqlProvider::activate(MessageStorePlugin &store) { QPID_LOG(info, "MS SQL Provider is up"); } void MSSqlProvider::create(PersistableQueue& queue, const qpid::framing::FieldTable& /*args needed for jrnl*/) { DatabaseConnection *db = initConnection(); BlobRecordset rsQueues; try { db->beginTransaction(); rsQueues.open(db, TblQueue); rsQueues.add(queue); db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error creating queue " + queue.getName(), e, errs); } } /** * Destroy a durable queue */ void MSSqlProvider::destroy(PersistableQueue& queue) { DatabaseConnection *db = initConnection(); BlobRecordset rsQueues; BindingRecordset rsBindings; MessageRecordset rsMessages; MessageMapRecordset rsMessageMaps; try { db->beginTransaction(); rsQueues.open(db, TblQueue); rsBindings.open(db, TblBinding); rsMessages.open(db, TblMessage); rsMessageMaps.open(db, TblMessageMap); // Remove bindings first; the queue IDs can't be ripped out from // under the references in the bindings table. Then remove the // message->queue entries for the queue, also because the queue can't // be deleted while there are references to it. If there are messages // orphaned by removing the queue references, they're deleted by // a trigger on the tblMessageMap table. Lastly, the queue record // can be removed. rsBindings.removeForQueue(queue.getPersistenceId()); rsMessageMaps.removeForQueue(queue.getPersistenceId()); rsQueues.remove(queue); db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error deleting queue " + queue.getName(), e, errs); } } /** * Record the existence of a durable exchange */ void MSSqlProvider::create(const PersistableExchange& exchange, const qpid::framing::FieldTable& args) { DatabaseConnection *db = initConnection(); BlobRecordset rsExchanges; try { db->beginTransaction(); rsExchanges.open(db, TblExchange); rsExchanges.add(exchange); db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error creating exchange " + exchange.getName(), e, errs); } } /** * Destroy a durable exchange */ void MSSqlProvider::destroy(const PersistableExchange& exchange) { DatabaseConnection *db = initConnection(); BlobRecordset rsExchanges; BindingRecordset rsBindings; try { db->beginTransaction(); rsExchanges.open(db, TblExchange); rsBindings.open(db, TblBinding); // Remove bindings first; the exchange IDs can't be ripped out from // under the references in the bindings table. rsBindings.removeForExchange(exchange.getPersistenceId()); rsExchanges.remove(exchange); db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error deleting exchange " + exchange.getName(), e, errs); } } /** * Record a binding */ void MSSqlProvider::bind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const qpid::framing::FieldTable& args) { DatabaseConnection *db = initConnection(); BindingRecordset rsBindings; try { db->beginTransaction(); rsBindings.open(db, TblBinding); rsBindings.add(exchange.getPersistenceId(), queue.getPersistenceId(), key, args); db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error binding exchange " + exchange.getName() + " to queue " + queue.getName(), e, errs); } } /** * Forget a binding */ void MSSqlProvider::unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const qpid::framing::FieldTable& args) { DatabaseConnection *db = initConnection(); BindingRecordset rsBindings; try { db->beginTransaction(); rsBindings.open(db, TblBinding); rsBindings.remove(exchange.getPersistenceId(), queue.getPersistenceId(), key, args); db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error unbinding exchange " + exchange.getName() + " from queue " + queue.getName(), e, errs); } } /** * Record generic durable configuration */ void MSSqlProvider::create(const PersistableConfig& config) { DatabaseConnection *db = initConnection(); BlobRecordset rsConfigs; try { db->beginTransaction(); rsConfigs.open(db, TblConfig); rsConfigs.add(config); db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error creating config " + config.getName(), e, errs); } } /** * Destroy generic durable configuration */ void MSSqlProvider::destroy(const PersistableConfig& config) { DatabaseConnection *db = initConnection(); BlobRecordset rsConfigs; try { db->beginTransaction(); rsConfigs.open(db, TblConfig); rsConfigs.remove(config); db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error deleting config " + config.getName(), e, errs); } } /** * Stores a messages before it has been enqueued * (enqueueing automatically stores the message so this is * only required if storage is required prior to that * point). If the message has not yet been stored it will * store the headers as well as any content passed in. A * persistence id will be set on the message which can be * used to load the content or to append to it. */ void MSSqlProvider::stage(const boost::intrusive_ptr& msg) { DatabaseConnection *db = initConnection(); MessageRecordset rsMessages; try { db->beginTransaction(); rsMessages.open(db, TblMessage); rsMessages.add(msg); db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error staging message", e, errs); } } /** * Destroys a previously staged message. This only needs * to be called if the message is never enqueued. (Once * enqueued, deletion will be automatic when the message * is dequeued from all queues it was enqueued onto). */ void MSSqlProvider::destroy(PersistableMessage& msg) { DatabaseConnection *db = initConnection(); BlobRecordset rsMessages; try { db->beginTransaction(); rsMessages.open(db, TblMessage); rsMessages.remove(msg); db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error deleting message", e, errs); } } /** * Appends content to a previously staged message */ void MSSqlProvider::appendContent(const boost::intrusive_ptr& msg, const std::string& data) { DatabaseConnection *db = initConnection(); MessageRecordset rsMessages; try { db->beginTransaction(); rsMessages.open(db, TblMessage); rsMessages.append(msg, data); db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error appending to message", e, errs); } } /** * Loads (a section) of content data for the specified * message (previously stored through a call to stage or * enqueue) into data. The offset refers to the content * only (i.e. an offset of 0 implies that the start of the * content should be loaded, not the headers or related * meta-data). */ void MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/, const boost::intrusive_ptr& msg, std::string& data, uint64_t offset, uint32_t length) { // SQL store keeps all messages in one table, so we don't need the // queue reference. DatabaseConnection *db = initConnection(); MessageRecordset rsMessages; try { rsMessages.open(db, TblMessage); rsMessages.loadContent(msg, data, offset, length); } catch(_com_error &e) { std::string errs = db->getErrors(); throw ADOException("Error loading message content", e, errs); } } /** * Enqueues a message, storing the message if it has not * been previously stored and recording that the given * message is on the given queue. * * @param ctxt The transaction context under which this enqueue happens. * @param msg The message to enqueue * @param queue the name of the queue onto which it is to be enqueued */ void MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr& msg, const PersistableQueue& queue) { // If this enqueue is in the context of a transaction, use the specified // transaction to nest a new transaction for this operation. However, if // this is not in the context of a transaction, then just use the thread's // DatabaseConnection with a ADO transaction. DatabaseConnection *db = 0; std::string xid; AmqpTransaction *atxn = dynamic_cast (ctxt); if (atxn == 0) { db = initConnection(); db->beginTransaction(); } else { (void)initState(); // Ensure this thread is initialized // It's a transactional enqueue; if it's TPC, grab the xid. AmqpTPCTransaction *tpcTxn = dynamic_cast (ctxt); if (tpcTxn) xid = tpcTxn->getXid(); db = atxn->dbConn(); try { atxn->sqlBegin(); } catch(_com_error &e) { throw ADOException("Error queuing message", e, db->getErrors()); } } MessageRecordset rsMessages; MessageMapRecordset rsMap; try { if (msg->getPersistenceId() == 0) { // Message itself not yet saved rsMessages.open(db, TblMessage); rsMessages.add(msg); } rsMap.open(db, TblMessageMap); rsMap.add(msg->getPersistenceId(), queue.getPersistenceId(), xid); if (atxn) atxn->sqlCommit(); else db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); if (atxn) atxn->sqlAbort(); else db->rollbackTransaction(); throw ADOException("Error queuing message", e, errs); } msg->enqueueComplete(); } /** * Dequeues a message, recording that the given message is * no longer on the given queue and deleting the message * if it is no longer on any other queue. * * @param ctxt The transaction context under which this dequeue happens. * @param msg The message to dequeue * @param queue The queue from which it is to be dequeued */ void MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr& msg, const PersistableQueue& queue) { // If this dequeue is in the context of a transaction, use the specified // transaction to nest a new transaction for this operation. However, if // this is not in the context of a transaction, then just use the thread's // DatabaseConnection with a ADO transaction. DatabaseConnection *db = 0; std::string xid; AmqpTransaction *atxn = dynamic_cast (ctxt); if (atxn == 0) { db = initConnection(); db->beginTransaction(); } else { (void)initState(); // Ensure this thread is initialized // It's a transactional dequeue; if it's TPC, grab the xid. AmqpTPCTransaction *tpcTxn = dynamic_cast (ctxt); if (tpcTxn) xid = tpcTxn->getXid(); db = atxn->dbConn(); try { atxn->sqlBegin(); } catch(_com_error &e) { throw ADOException("Error queuing message", e, db->getErrors()); } } MessageMapRecordset rsMap; try { rsMap.open(db, TblMessageMap); // TPC dequeues are just marked pending and will actually be removed // when the transaction commits; Single-phase dequeues are removed // now, relying on the SQL transaction to put it back if the // transaction doesn't commit. if (!xid.empty()) { rsMap.pendingRemove(msg->getPersistenceId(), queue.getPersistenceId(), xid); } else { rsMap.remove(msg->getPersistenceId(), queue.getPersistenceId()); } if (atxn) atxn->sqlCommit(); else db->commitTransaction(); } catch(ms_sql::Exception&) { if (atxn) atxn->sqlAbort(); else db->rollbackTransaction(); throw; } catch(_com_error &e) { std::string errs = db->getErrors(); if (atxn) atxn->sqlAbort(); else db->rollbackTransaction(); throw ADOException("Error dequeuing message", e, errs); } msg->dequeueComplete(); } std::auto_ptr MSSqlProvider::begin() { (void)initState(); // Ensure this thread is initialized // Transactions are associated with the Connection, so this transaction // context needs its own connection. At the time of writing, single-phase // transactions are dealt with completely on one thread, so we really // could just use the thread-specific DatabaseConnection for this. // However, that would introduce an ugly, hidden coupling, so play // it safe and handle this just like a TPC transaction, which actually // can be prepared and committed/aborted from different threads, // making it a bad idea to try using the thread-local DatabaseConnection. boost::shared_ptr db(new DatabaseConnection); db->open(options.connectString, options.catalogName); std::auto_ptr tx(new AmqpTransaction(db)); tx->sqlBegin(); std::auto_ptr tc(tx); return tc; } std::auto_ptr MSSqlProvider::begin(const std::string& xid) { (void)initState(); // Ensure this thread is initialized boost::shared_ptr db(new DatabaseConnection); db->open(options.connectString, options.catalogName); std::auto_ptr tx(new AmqpTPCTransaction(db, xid)); tx->sqlBegin(); TplRecordset rsTpl; try { tx->sqlBegin(); rsTpl.open(db.get(), TblTpl); rsTpl.add(xid); tx->sqlCommit(); } catch(_com_error &e) { std::string errs = db->getErrors(); tx->sqlAbort(); throw ADOException("Error adding TPL record", e, errs); } std::auto_ptr tc(tx); return tc; } void MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn) { // Commit all the marked-up enqueue/dequeue ops and the TPL record. // On commit/rollback the TPL will be removed and the TPL markups // on the message map will be cleaned up as well. (void)initState(); // Ensure this thread is initialized AmqpTPCTransaction *atxn = dynamic_cast (&txn); if (atxn == 0) throw qpid::broker::InvalidTransactionContextException(); try { atxn->sqlCommit(); } catch(_com_error &e) { throw ADOException("Error preparing", e, atxn->dbConn()->getErrors()); } atxn->setPrepared(); } void MSSqlProvider::commit(qpid::broker::TransactionContext& txn) { (void)initState(); // Ensure this thread is initialized /* * One-phase transactions simply commit the outer SQL transaction * that was begun on begin(). Two-phase transactions are different - * the SQL transaction started on begin() was committed on prepare() * so all the SQL records reflecting the enqueue/dequeue actions for * the transaction are recorded but with xid markups on them to reflect * that they are prepared but not committed. Now go back and remove * the markups, deleting those marked for removal. */ AmqpTPCTransaction *p2txn = dynamic_cast (&txn); if (p2txn == 0) { AmqpTransaction *p1txn = dynamic_cast (&txn); if (p1txn == 0) throw qpid::broker::InvalidTransactionContextException(); p1txn->sqlCommit(); return; } DatabaseConnection *db(p2txn->dbConn()); TplRecordset rsTpl; MessageMapRecordset rsMessageMap; try { db->beginTransaction(); rsTpl.open(db, TblTpl); rsMessageMap.open(db, TblMessageMap); rsMessageMap.commitPrepared(p2txn->getXid()); rsTpl.remove(p2txn->getXid()); db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error committing transaction", e, errs); } } void MSSqlProvider::abort(qpid::broker::TransactionContext& txn) { (void)initState(); // Ensure this thread is initialized /* * One-phase and non-prepared two-phase transactions simply abort * the outer SQL transaction that was begun on begin(). However, prepared * two-phase transactions are different - the SQL transaction started * on begin() was committed on prepare() so all the SQL records * reflecting the enqueue/dequeue actions for the transaction are * recorded but with xid markups on them to reflect that they are * prepared but not committed. Now go back and remove the markups, * deleting those marked for addition. */ AmqpTPCTransaction *p2txn = dynamic_cast (&txn); if (p2txn == 0 || !p2txn->isPrepared()) { AmqpTransaction *p1txn = dynamic_cast (&txn); if (p1txn == 0) throw qpid::broker::InvalidTransactionContextException(); p1txn->sqlAbort(); return; } DatabaseConnection *db(p2txn->dbConn()); TplRecordset rsTpl; MessageMapRecordset rsMessageMap; try { db->beginTransaction(); rsTpl.open(db, TblTpl); rsMessageMap.open(db, TblMessageMap); rsMessageMap.abortPrepared(p2txn->getXid()); rsTpl.remove(p2txn->getXid()); db->commitTransaction(); } catch(_com_error &e) { std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error committing transaction", e, errs); } (void)initState(); // Ensure this thread is initialized AmqpTransaction *atxn = dynamic_cast (&txn); if (atxn == 0) throw qpid::broker::InvalidTransactionContextException(); atxn->sqlAbort(); } void MSSqlProvider::collectPreparedXids(std::set& xids) { DatabaseConnection *db = initConnection(); try { TplRecordset rsTpl; rsTpl.open(db, TblTpl); rsTpl.recover(xids); } catch(_com_error &e) { throw ADOException("Error reading TPL", e, db->getErrors()); } } // @TODO Much of this recovery code is way too similar... refactor to // a recover template method on BlobRecordset. void MSSqlProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer) { DatabaseConnection *db = 0; try { db = initConnection(); BlobRecordset rsConfigs; rsConfigs.open(db, TblConfig); _RecordsetPtr p = (_RecordsetPtr)rsConfigs; if (p->BOF && p->EndOfFile) return; // Nothing to do p->MoveFirst(); while (!p->EndOfFile) { uint64_t id = p->Fields->Item["persistenceId"]->Value; long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; BlobAdapter blob(blobSize); blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); // Recreate the Config instance and reset its ID. broker::RecoverableConfig::shared_ptr config = recoverer.recoverConfig(blob); config->setPersistenceId(id); p->MoveNext(); } } catch(_com_error &e) { throw ADOException("Error recovering configs", e, db ? db->getErrors() : ""); } } void MSSqlProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer, ExchangeMap& exchangeMap) { DatabaseConnection *db = 0; try { db = initConnection(); BlobRecordset rsExchanges; rsExchanges.open(db, TblExchange); _RecordsetPtr p = (_RecordsetPtr)rsExchanges; if (p->BOF && p->EndOfFile) return; // Nothing to do p->MoveFirst(); while (!p->EndOfFile) { uint64_t id = p->Fields->Item["persistenceId"]->Value; long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; BlobAdapter blob(blobSize); blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); // Recreate the Exchange instance, reset its ID, and remember the // ones restored for matching up when recovering bindings. broker::RecoverableExchange::shared_ptr exchange = recoverer.recoverExchange(blob); exchange->setPersistenceId(id); exchangeMap[id] = exchange; p->MoveNext(); } } catch(_com_error &e) { throw ADOException("Error recovering exchanges", e, db ? db->getErrors() : ""); } } void MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer, QueueMap& queueMap) { DatabaseConnection *db = 0; try { db = initConnection(); BlobRecordset rsQueues; rsQueues.open(db, TblQueue); _RecordsetPtr p = (_RecordsetPtr)rsQueues; if (p->BOF && p->EndOfFile) return; // Nothing to do p->MoveFirst(); while (!p->EndOfFile) { uint64_t id = p->Fields->Item["persistenceId"]->Value; long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; BlobAdapter blob(blobSize); blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); // Recreate the Queue instance and reset its ID. broker::RecoverableQueue::shared_ptr queue = recoverer.recoverQueue(blob); queue->setPersistenceId(id); queueMap[id] = queue; p->MoveNext(); } } catch(_com_error &e) { throw ADOException("Error recovering queues", e, db ? db->getErrors() : ""); } } void MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer, const ExchangeMap& exchangeMap, const QueueMap& queueMap) { DatabaseConnection *db = 0; try { db = initConnection(); BindingRecordset rsBindings; rsBindings.open(db, TblBinding); rsBindings.recover(recoverer, exchangeMap, queueMap); } catch(_com_error &e) { throw ADOException("Error recovering bindings", e, db ? db->getErrors() : ""); } } void MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer, MessageMap& messageMap, MessageQueueMap& messageQueueMap) { DatabaseConnection *db = 0; try { db = initConnection(); MessageRecordset rsMessages; rsMessages.open(db, TblMessage); rsMessages.recover(recoverer, messageMap); MessageMapRecordset rsMessageMaps; rsMessageMaps.open(db, TblMessageMap); rsMessageMaps.recover(messageQueueMap); } catch(_com_error &e) { throw ADOException("Error recovering messages", e, db ? db->getErrors() : ""); } } void MSSqlProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer, PreparedTransactionMap& dtxMap) { DatabaseConnection *db = initConnection(); std::set xids; try { TplRecordset rsTpl; rsTpl.open(db, TblTpl); rsTpl.recover(xids); } catch(_com_error &e) { throw ADOException("Error recovering TPL records", e, db->getErrors()); } try { // Rebuild the needed RecoverableTransactions. for (std::set::const_iterator iXid = xids.begin(); iXid != xids.end(); ++iXid) { boost::shared_ptr dbX(new DatabaseConnection); dbX->open(options.connectString, options.catalogName); std::auto_ptr tx(new AmqpTPCTransaction(dbX, *iXid)); tx->setPrepared(); std::auto_ptr tc(tx); dtxMap[*iXid] = recoverer.recoverTransaction(*iXid, tc); } } catch(_com_error &e) { throw ADOException("Error recreating dtx connection", e); } } ////////////// Internal Methods State * MSSqlProvider::initState() { State *state = dbState.get(); // See if thread has initialized if (!state) { state = new State; dbState.reset(state); } return state; } DatabaseConnection * MSSqlProvider::initConnection(void) { State *state = initState(); if (state->dbConn != 0) return state->dbConn; // And the DatabaseConnection is set up too std::auto_ptr db(new DatabaseConnection); db->open(options.connectString, options.catalogName); state->dbConn = db.release(); return state->dbConn; } void MSSqlProvider::createDb(DatabaseConnection *db, const std::string &name) { const std::string dbCmd = "CREATE DATABASE " + name; const std::string useCmd = "USE " + name; const std::string tableCmd = "CREATE TABLE "; const std::string colSpecs = " (persistenceId bigint PRIMARY KEY NOT NULL IDENTITY(1,1)," " fieldTableBlob varbinary(MAX) NOT NULL)"; const std::string bindingSpecs = " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL," " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL," " routingKey varchar(255)," " fieldTableBlob varbinary(MAX))"; const std::string messageMapSpecs = " (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL," " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL," " prepareStatus tinyint CHECK (prepareStatus IS NULL OR " " prepareStatus IN (1, 2))," " xid varbinary(512) REFERENCES tblTPL(xid)" " CONSTRAINT CK_NoDups UNIQUE NONCLUSTERED (messageId, queueId) )"; const std::string tplSpecs = " (xid varbinary(512) PRIMARY KEY NOT NULL)"; // SET NOCOUNT ON added to prevent extra result sets from // interfering with SELECT statements. (Added by SQL Management) const std::string removeUnrefMsgsTrigger = "CREATE TRIGGER dbo.RemoveUnreferencedMessages " "ON tblMessageMap AFTER DELETE AS BEGIN " "SET NOCOUNT ON; " "DELETE FROM tblMessage " "WHERE tblMessage.persistenceId IN " " (SELECT messageId FROM deleted) AND" " NOT EXISTS(SELECT * FROM tblMessageMap" " WHERE tblMessageMap.messageId IN" " (SELECT messageId FROM deleted)) " "END"; _variant_t unused; _bstr_t dbStr = dbCmd.c_str(); _ConnectionPtr conn(*db); try { conn->Execute(dbStr, &unused, adExecuteNoRecords); _bstr_t useStr = useCmd.c_str(); conn->Execute(useStr, &unused, adExecuteNoRecords); std::string makeTable = tableCmd + TblQueue + colSpecs; _bstr_t makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); makeTable = tableCmd + TblExchange + colSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); makeTable = tableCmd + TblConfig + colSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); makeTable = tableCmd + TblMessage + colSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); makeTable = tableCmd + TblBinding + bindingSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); makeTable = tableCmd + TblTpl + tplSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); makeTable = tableCmd + TblMessageMap + messageMapSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); _bstr_t addTriggerStr = removeUnrefMsgsTrigger.c_str(); conn->Execute(addTriggerStr, &unused, adExecuteNoRecords); } catch(_com_error &e) { throw ADOException("MSSQL can't create " + name, e, db->getErrors()); } } void MSSqlProvider::dump() { // dump all db records to qpid_log QPID_LOG(notice, "DB Dump: (not dumping anything)"); // rsQueues.dump(); } }}} // namespace qpid::store::ms_sql