diff options
Diffstat (limited to 'cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp')
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp | 312 |
1 files changed, 250 insertions, 62 deletions
diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp index a26df59df7..323bc94c5c 100644 --- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp +++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp @@ -32,6 +32,7 @@ #include "BindingRecordset.h" #include "MessageMapRecordset.h" #include "MessageRecordset.h" +#include "TplRecordset.h" #include "DatabaseConnection.h" #include "Exception.h" #include "State.h" @@ -51,6 +52,7 @@ const std::string TblExchange("tblExchange"); const std::string TblMessage("tblMessage"); const std::string TblMessageMap("tblMessageMap"); const std::string TblQueue("tblQueue"); +const std::string TblTpl("tblTPL"); } namespace qpid { @@ -256,9 +258,7 @@ public: virtual void prepare(qpid::broker::TPCTransactionContext& txn); virtual void commit(qpid::broker::TransactionContext& txn); virtual void abort(qpid::broker::TransactionContext& txn); - - // @TODO This maybe should not be in TransactionalStore - virtual void collectPreparedXids(std::set<std::string>& xids) {} + virtual void collectPreparedXids(std::set<std::string>& xids); //@} virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer); @@ -272,6 +272,8 @@ public: virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, MessageMap& messageMap, MessageQueueMap& messageQueueMap); + virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap); private: struct ProviderOptions : public qpid::Options @@ -310,7 +312,7 @@ private: State *initState(); DatabaseConnection *initConnection(void); - void createDb(_ConnectionPtr conn, const std::string &name); + void createDb(DatabaseConnection *db, const std::string &name); }; static MSSqlProvider static_instance_registers_plugin; @@ -356,7 +358,7 @@ MSSqlProvider::earlyInitialize(Plugin::Target &target) // Database doesn't exist; create it QPID_LOG(notice, "MSSQL: Creating database " + options.catalogName); - createDb(conn, options.catalogName); + createDb(db.get(), options.catalogName); } else { QPID_LOG(notice, @@ -407,8 +409,9 @@ MSSqlProvider::create(PersistableQueue& queue, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error creating queue " + queue.getName(), e); + throw ADOException("Error creating queue " + queue.getName(), e, errs); } } @@ -418,14 +421,6 @@ MSSqlProvider::create(PersistableQueue& queue, void MSSqlProvider::destroy(PersistableQueue& queue) { - // MessageDeleter class for use with for_each, below. - class MessageDeleter { - BlobRecordset& msgs; - public: - explicit MessageDeleter(BlobRecordset& _msgs) : msgs(_msgs) {} - void operator()(uint64_t msgId) { msgs.remove(msgId); } - }; - DatabaseConnection *db = initConnection(); BlobRecordset rsQueues; BindingRecordset rsBindings; @@ -441,19 +436,18 @@ MSSqlProvider::destroy(PersistableQueue& queue) // under the references in the bindings table. Then remove the // message->queue entries for the queue, also because the queue can't // be deleted while there are references to it. If there are messages - // orphaned by removing the queue references, those messages can - // also be deleted. Lastly, the queue record can be removed. + // orphaned by removing the queue references, they're deleted by + // a trigger on the tblMessageMap table. Lastly, the queue record + // can be removed. rsBindings.removeForQueue(queue.getPersistenceId()); - std::vector<uint64_t> orphans; - rsMessageMaps.removeForQueue(queue.getPersistenceId(), orphans); - std::for_each(orphans.begin(), orphans.end(), - MessageDeleter(rsMessages)); + rsMessageMaps.removeForQueue(queue.getPersistenceId()); rsQueues.remove(queue); db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting queue " + queue.getName(), e); + throw ADOException("Error deleting queue " + queue.getName(), e, errs); } } @@ -473,8 +467,11 @@ MSSqlProvider::create(const PersistableExchange& exchange, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error creating exchange " + exchange.getName(), e); + throw ADOException("Error creating exchange " + exchange.getName(), + e, + errs); } } @@ -498,8 +495,11 @@ MSSqlProvider::destroy(const PersistableExchange& exchange) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting exchange " + exchange.getName(), e); + throw ADOException("Error deleting exchange " + exchange.getName(), + e, + errs); } } @@ -524,9 +524,12 @@ MSSqlProvider::bind(const PersistableExchange& exchange, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error binding exchange " + exchange.getName() + - " to queue " + queue.getName(), e); + " to queue " + queue.getName(), + e, + errs); } } @@ -551,9 +554,12 @@ MSSqlProvider::unbind(const PersistableExchange& exchange, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error unbinding exchange " + exchange.getName() + - " from queue " + queue.getName(), e); + " from queue " + queue.getName(), + e, + errs); } } @@ -572,8 +578,9 @@ MSSqlProvider::create(const PersistableConfig& config) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error creating config " + config.getName(), e); + throw ADOException("Error creating config " + config.getName(), e, errs); } } @@ -592,8 +599,9 @@ MSSqlProvider::destroy(const PersistableConfig& config) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting config " + config.getName(), e); + throw ADOException("Error deleting config " + config.getName(), e, errs); } } @@ -618,8 +626,9 @@ MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error staging message", e); + throw ADOException("Error staging message", e, errs); } } @@ -641,8 +650,9 @@ MSSqlProvider::destroy(PersistableMessage& msg) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting message", e); + throw ADOException("Error deleting message", e, errs); } } @@ -662,8 +672,9 @@ MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error appending to message", e); + throw ADOException("Error appending to message", e, errs); } } @@ -691,7 +702,8 @@ MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/, rsMessages.loadContent(msg, data, offset, length); } catch(_com_error &e) { - throw ADOException("Error loading message content", e); + std::string errs = db->getErrors(); + throw ADOException("Error loading message content", e, errs); } } @@ -714,6 +726,7 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, // this is not in the context of a transaction, then just use the thread's // DatabaseConnection with a ADO transaction. DatabaseConnection *db = 0; + std::string xid; AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); if (atxn == 0) { db = initConnection(); @@ -721,12 +734,16 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, } else { (void)initState(); // Ensure this thread is initialized + // It's a transactional enqueue; if it's TPC, grab the xid. + AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt); + if (tpcTxn) + xid = tpcTxn->getXid(); db = atxn->dbConn(); try { - atxn->begin(); + atxn->sqlBegin(); } catch(_com_error &e) { - throw ADOException("Error queuing message", e); + throw ADOException("Error queuing message", e, db->getErrors()); } } @@ -738,18 +755,19 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, rsMessages.add(msg); } rsMap.open(db, TblMessageMap); - rsMap.add(msg->getPersistenceId(), queue.getPersistenceId()); + rsMap.add(msg->getPersistenceId(), queue.getPersistenceId(), xid); if (atxn) - atxn->commit(); + atxn->sqlCommit(); else db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); if (atxn) - atxn->abort(); + atxn->sqlAbort(); else db->rollbackTransaction(); - throw ADOException("Error queuing message", e); + throw ADOException("Error queuing message", e, errs); } msg->enqueueComplete(); } @@ -773,6 +791,7 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, // this is not in the context of a transaction, then just use the thread's // DatabaseConnection with a ADO transaction. DatabaseConnection *db = 0; + std::string xid; AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); if (atxn == 0) { db = initConnection(); @@ -780,36 +799,54 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, } else { (void)initState(); // Ensure this thread is initialized + // It's a transactional dequeue; if it's TPC, grab the xid. + AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt); + if (tpcTxn) + xid = tpcTxn->getXid(); db = atxn->dbConn(); try { - atxn->begin(); + atxn->sqlBegin(); } catch(_com_error &e) { - throw ADOException("Error queuing message", e); + throw ADOException("Error queuing message", e, db->getErrors()); } } MessageMapRecordset rsMap; - MessageRecordset rsMessages; try { rsMap.open(db, TblMessageMap); - bool more = rsMap.remove(msg->getPersistenceId(), - queue.getPersistenceId()); - if (!more) { - rsMessages.open(db, TblMessage); - rsMessages.remove(msg); + // TPC dequeues are just marked pending and will actually be removed + // when the transaction commits; Single-phase dequeues are removed + // now, relying on the SQL transaction to put it back if the + // transaction doesn't commit. + if (!xid.empty()) { + rsMap.pendingRemove(msg->getPersistenceId(), + queue.getPersistenceId(), + xid); + } + else { + rsMap.remove(msg->getPersistenceId(), + queue.getPersistenceId()); } if (atxn) - atxn->commit(); + atxn->sqlCommit(); else db->commitTransaction(); } + catch(ms_sql::Exception&) { + if (atxn) + atxn->sqlAbort(); + else + db->rollbackTransaction(); + throw; + } catch(_com_error &e) { + std::string errs = db->getErrors(); if (atxn) - atxn->abort(); + atxn->sqlAbort(); else db->rollbackTransaction(); - throw ADOException("Error dequeuing message", e); + throw ADOException("Error dequeuing message", e, errs); } msg->dequeueComplete(); } @@ -827,10 +864,10 @@ MSSqlProvider::begin() // it safe and handle this just like a TPC transaction, which actually // can be prepared and committed/aborted from different threads, // making it a bad idea to try using the thread-local DatabaseConnection. - std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection); db->open(options.connectString, options.catalogName); std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db)); - tx->begin(); + tx->sqlBegin(); std::auto_ptr<qpid::broker::TransactionContext> tc(tx); return tc; } @@ -839,10 +876,24 @@ std::auto_ptr<qpid::broker::TPCTransactionContext> MSSqlProvider::begin(const std::string& xid) { (void)initState(); // Ensure this thread is initialized - std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection); db->open(options.connectString, options.catalogName); std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid)); - tx->begin(); + tx->sqlBegin(); + + TplRecordset rsTpl; + try { + tx->sqlBegin(); + rsTpl.open(db.get(), TblTpl); + rsTpl.add(xid); + tx->sqlCommit(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + tx->sqlAbort(); + throw ADOException("Error adding TPL record", e, errs); + } + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx); return tc; } @@ -850,28 +901,122 @@ MSSqlProvider::begin(const std::string& xid) void MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn) { - // The inner transactions used for the components of the TPC are done; - // nothing else to do but wait for the commit. + // Commit all the marked-up enqueue/dequeue ops and the TPL record. + // On commit/rollback the TPL will be removed and the TPL markups + // on the message map will be cleaned up as well. + (void)initState(); // Ensure this thread is initialized + AmqpTPCTransaction *atxn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (atxn == 0) + throw qpid::broker::InvalidTransactionContextException(); + try { + atxn->sqlCommit(); + } + catch(_com_error &e) { + throw ADOException("Error preparing", e, atxn->dbConn()->getErrors()); + } + atxn->setPrepared(); } void MSSqlProvider::commit(qpid::broker::TransactionContext& txn) { (void)initState(); // Ensure this thread is initialized - AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn); - if (atxn == 0) - throw qpid::broker::InvalidTransactionContextException(); - atxn->commit(); + /* + * One-phase transactions simply commit the outer SQL transaction + * that was begun on begin(). Two-phase transactions are different - + * the SQL transaction started on begin() was committed on prepare() + * so all the SQL records reflecting the enqueue/dequeue actions for + * the transaction are recorded but with xid markups on them to reflect + * that they are prepared but not committed. Now go back and remove + * the markups, deleting those marked for removal. + */ + AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (p2txn == 0) { + AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn); + if (p1txn == 0) + throw qpid::broker::InvalidTransactionContextException(); + p1txn->sqlCommit(); + return; + } + + DatabaseConnection *db(p2txn->dbConn()); + TplRecordset rsTpl; + MessageMapRecordset rsMessageMap; + try { + db->beginTransaction(); + rsTpl.open(db, TblTpl); + rsMessageMap.open(db, TblMessageMap); + rsMessageMap.commitPrepared(p2txn->getXid()); + rsTpl.remove(p2txn->getXid()); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error committing transaction", e, errs); + } } void MSSqlProvider::abort(qpid::broker::TransactionContext& txn) { (void)initState(); // Ensure this thread is initialized + /* + * One-phase and non-prepared two-phase transactions simply abort + * the outer SQL transaction that was begun on begin(). However, prepared + * two-phase transactions are different - the SQL transaction started + * on begin() was committed on prepare() so all the SQL records + * reflecting the enqueue/dequeue actions for the transaction are + * recorded but with xid markups on them to reflect that they are + * prepared but not committed. Now go back and remove the markups, + * deleting those marked for addition. + */ + AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (p2txn == 0 || !p2txn->isPrepared()) { + AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn); + if (p1txn == 0) + throw qpid::broker::InvalidTransactionContextException(); + p1txn->sqlAbort(); + return; + } + + DatabaseConnection *db(p2txn->dbConn()); + TplRecordset rsTpl; + MessageMapRecordset rsMessageMap; + try { + db->beginTransaction(); + rsTpl.open(db, TblTpl); + rsMessageMap.open(db, TblMessageMap); + rsMessageMap.abortPrepared(p2txn->getXid()); + rsTpl.remove(p2txn->getXid()); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error committing transaction", e, errs); + } + + + (void)initState(); // Ensure this thread is initialized AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn); if (atxn == 0) throw qpid::broker::InvalidTransactionContextException(); - atxn->abort(); + atxn->sqlAbort(); +} + +void +MSSqlProvider::collectPreparedXids(std::set<std::string>& xids) +{ + DatabaseConnection *db = initConnection(); + try { + TplRecordset rsTpl; + rsTpl.open(db, TblTpl); + rsTpl.recover(xids); + } + catch(_com_error &e) { + throw ADOException("Error reading TPL", e, db->getErrors()); + } } // @TODO Much of this recovery code is way too similar... refactor to @@ -977,6 +1122,40 @@ MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer, rsMessageMaps.recover(messageQueueMap); } +void +MSSqlProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap) +{ + DatabaseConnection *db = initConnection(); + std::set<std::string> xids; + try { + TplRecordset rsTpl; + rsTpl.open(db, TblTpl); + rsTpl.recover(xids); + } + catch(_com_error &e) { + throw ADOException("Error recovering TPL records", e, db->getErrors()); + } + + try { + // Rebuild the needed RecoverableTransactions. + for (std::set<std::string>::const_iterator iXid = xids.begin(); + iXid != xids.end(); + ++iXid) { + boost::shared_ptr<DatabaseConnection> dbX(new DatabaseConnection); + dbX->open(options.connectString, options.catalogName); + std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(dbX, + *iXid)); + tx->setPrepared(); + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx); + dtxMap[*iXid] = recoverer.recoverTransaction(*iXid, tc); + } + } + catch(_com_error &e) { + throw ADOException("Error recreating dtx connection", e); + } +} + ////////////// Internal Methods State * @@ -1003,7 +1182,7 @@ MSSqlProvider::initConnection(void) } void -MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) +MSSqlProvider::createDb(DatabaseConnection *db, const std::string &name) { const std::string dbCmd = "CREATE DATABASE " + name; const std::string useCmd = "USE " + name; @@ -1018,9 +1197,15 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) " fieldTableBlob varbinary(MAX))"; const std::string messageMapSpecs = " (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL," - " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL)"; + " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL," + " prepareStatus tinyint CHECK (prepareStatus IS NULL OR " + " prepareStatus IN (1, 2))," + " xid varbinary(512) REFERENCES tblTPL(xid)" + " CONSTRAINT CK_NoDups UNIQUE NONCLUSTERED (messageId, queueId) )"; + const std::string tplSpecs = " (xid varbinary(512) PRIMARY KEY NOT NULL)"; _variant_t unused; _bstr_t dbStr = dbCmd.c_str(); + _ConnectionPtr conn(*db); try { conn->Execute(dbStr, &unused, adExecuteNoRecords); _bstr_t useStr = useCmd.c_str(); @@ -1040,12 +1225,15 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) makeTable = tableCmd + TblBinding + bindingSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblTpl + tplSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); makeTable = tableCmd + TblMessageMap + messageMapSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); } catch(_com_error &e) { - throw ADOException("MSSQL can't create " + name, e); + throw ADOException("MSSQL can't create " + name, e, db->getErrors()); } } |