diff options
author | Stephen D. Huston <shuston@apache.org> | 2010-04-16 20:12:55 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2010-04-16 20:12:55 +0000 |
commit | a2313ce0fc34fbe4864445595e1db1955c4918a1 (patch) | |
tree | 4757029ffe7a3970cf38d32f8775a14bef7ea259 /cpp/src/qpid/store | |
parent | eb56a638b2aea7e56c55e49b267cfe2f673afe51 (diff) | |
download | qpid-python-a2313ce0fc34fbe4864445595e1db1955c4918a1.tar.gz |
Fix for QPID-2420 to correctly handle restoring and commit/abort prepared transactions.
The basic approach is documented in QPID-2420. This also makes improvements in the way changes are done to the tblMessageMap table which should perform much better, avoiding pulling the whole table into the broker just to add or edit or delete a single record. Also, some of the consistency checks and enforcements are moved into the database itself from the C++ code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@935068 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/store')
-rw-r--r-- | cpp/src/qpid/store/CMakeLists.txt | 2 | ||||
-rw-r--r-- | cpp/src/qpid/store/MessageStorePlugin.cpp | 26 | ||||
-rw-r--r-- | cpp/src/qpid/store/StorageProvider.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp | 44 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/AmqpTransaction.h | 29 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/DatabaseConnection.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/Exception.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp | 312 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp | 217 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MessageMapRecordset.h | 51 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/Recordset.cpp | 57 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/Recordset.h | 40 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/SqlTransaction.cpp | 71 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/SqlTransaction.h | 67 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/TplRecordset.cpp | 128 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/TplRecordset.h | 58 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/VariantHelper.cpp | 16 |
18 files changed, 899 insertions, 262 deletions
diff --git a/cpp/src/qpid/store/CMakeLists.txt b/cpp/src/qpid/store/CMakeLists.txt index 0d25923175..5c04e825d7 100644 --- a/cpp/src/qpid/store/CMakeLists.txt +++ b/cpp/src/qpid/store/CMakeLists.txt @@ -71,7 +71,9 @@ if (BUILD_MSSQL) ms-sql/MessageMapRecordset.cpp ms-sql/MessageRecordset.cpp ms-sql/Recordset.cpp + ms-sql/SqlTransaction.cpp ms-sql/State.cpp + ms-sql/TplRecordset.cpp ms-sql/VariantHelper.cpp) target_link_libraries (mssql_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY}) install (TARGETS mssql_store # RUNTIME diff --git a/cpp/src/qpid/store/MessageStorePlugin.cpp b/cpp/src/qpid/store/MessageStorePlugin.cpp index 05b6ef4465..fdb947eef2 100644 --- a/cpp/src/qpid/store/MessageStorePlugin.cpp +++ b/cpp/src/qpid/store/MessageStorePlugin.cpp @@ -405,11 +405,14 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer) QueueMap queues; MessageMap messages; MessageQueueMap messageQueueMap; + std::vector<std::string> xids; + PreparedTransactionMap dtxMap; provider->second->recoverConfigs(recoverer); provider->second->recoverExchanges(recoverer, exchanges); provider->second->recoverQueues(recoverer, queues); provider->second->recoverBindings(recoverer, exchanges, queues); + provider->second->recoverTransactions(recoverer, dtxMap); provider->second->recoverMessages(recoverer, messages, messageQueueMap); // Enqueue msgs where needed. for (MessageQueueMap::const_iterator i = messageQueueMap.begin(); @@ -426,22 +429,33 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer) broker::RecoverableMessage::shared_ptr msg = iMsg->second; // Now for each queue referenced in the queue map, locate it // and re-enqueue the message. - for (std::vector<uint64_t>::const_iterator j = i->second.begin(); + for (std::vector<QueueEntry>::const_iterator j = i->second.begin(); j != i->second.end(); ++j) { // Locate the queue corresponding to the current queue Id - QueueMap::const_iterator iQ = queues.find(*j); + QueueMap::const_iterator iQ = queues.find(j->queueId); if (iQ == queues.end()) { std::ostringstream oss; oss << "No matching queue trying to re-enqueue message " - << " on queue Id " << *j; + << " on queue Id " << j->queueId; THROW_STORE_EXCEPTION(oss.str()); } - iQ->second->recover(msg); + // Messages involved in prepared transactions have their status + // updated accordingly. First, though, restore a message that + // is expected to be on a queue, including non-transacted + // messages and those pending dequeue in a dtx. + if (j->tplStatus != QueueEntry::ADDING) + iQ->second->recover(msg); + switch(j->tplStatus) { + case QueueEntry::ADDING: + dtxMap[j->xid]->enqueue(iQ->second, msg); + break; + case QueueEntry::REMOVING: + dtxMap[j->xid]->dequeue(iQ->second, msg); + break; + } } } - - // recoverTransactions() and apply correctly while re-enqueuing } }} // namespace qpid::store diff --git a/cpp/src/qpid/store/StorageProvider.h b/cpp/src/qpid/store/StorageProvider.h index 1f257e7416..fc19f089fb 100644 --- a/cpp/src/qpid/store/StorageProvider.h +++ b/cpp/src/qpid/store/StorageProvider.h @@ -44,8 +44,16 @@ typedef std::map<uint64_t, qpid::broker::RecoverableQueue::shared_ptr> QueueMap; typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr> MessageMap; -// Msg Id -> vector of queue Ids where message is queued -typedef std::map<uint64_t, std::vector<uint64_t> > MessageQueueMap; +// Msg Id -> vector of queue entries where message is queued +struct QueueEntry { + enum TplStatus { NONE = 0, ADDING = 1, REMOVING = 2 }; + uint64_t queueId; + TplStatus tplStatus; + std::string xid; +}; +typedef std::map<uint64_t, std::vector<QueueEntry> > MessageQueueMap; +typedef std::map<std::string, qpid::broker::RecoverableTransaction::shared_ptr> + PreparedTransactionMap; class MessageStorePlugin; @@ -316,6 +324,8 @@ public: virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, MessageMap& messageMap, MessageQueueMap& messageQueueMap) = 0; + virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap) = 0; //@} }; diff --git a/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp b/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp index 0ecfacfb4b..095d1bf331 100644 --- a/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp +++ b/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp @@ -26,51 +26,37 @@ namespace qpid { namespace store { namespace ms_sql { -AmqpTransaction::AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db) - : db(_db), transDepth(0) +AmqpTransaction::AmqpTransaction(const boost::shared_ptr<DatabaseConnection>& _db) + : db(_db), sqlTrans(_db) { } AmqpTransaction::~AmqpTransaction() { - if (transDepth > 0) - this->abort(); } void -AmqpTransaction::begin() +AmqpTransaction::sqlBegin() { - _bstr_t beginCmd("BEGIN TRANSACTION"); - _ConnectionPtr c = *db; - c->Execute(beginCmd, NULL, adExecuteNoRecords); - ++transDepth; + sqlTrans.begin(); } void -AmqpTransaction::commit() +AmqpTransaction::sqlCommit() { - if (transDepth > 0) { - _bstr_t commitCmd("COMMIT TRANSACTION"); - _ConnectionPtr c = *db; - c->Execute(commitCmd, NULL, adExecuteNoRecords); - --transDepth; - } + sqlTrans.commit(); } void -AmqpTransaction::abort() +AmqpTransaction::sqlAbort() { - if (transDepth > 0) { - _bstr_t rollbackCmd("ROLLBACK TRANSACTION"); - _ConnectionPtr c = *db; - c->Execute(rollbackCmd, NULL, adExecuteNoRecords); - transDepth = 0; - } + sqlTrans.abort(); } -AmqpTPCTransaction::AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db, + +AmqpTPCTransaction::AmqpTPCTransaction(const boost::shared_ptr<DatabaseConnection>& db, const std::string& _xid) - : AmqpTransaction(_db), xid(_xid) + : AmqpTransaction(db), prepared(false), xid(_xid) { } @@ -78,12 +64,4 @@ AmqpTPCTransaction::~AmqpTPCTransaction() { } -void -AmqpTPCTransaction::prepare() -{ - // Intermediate transactions should have already assured integrity of - // the content in the database; just waiting to pull the trigger on the - // outermost transaction. -} - }}} // namespace qpid::store::ms_sql diff --git a/cpp/src/qpid/store/ms-sql/AmqpTransaction.h b/cpp/src/qpid/store/ms-sql/AmqpTransaction.h index 9b87d0ae15..625fab5595 100644 --- a/cpp/src/qpid/store/ms-sql/AmqpTransaction.h +++ b/cpp/src/qpid/store/ms-sql/AmqpTransaction.h @@ -23,8 +23,10 @@ */ #include <qpid/broker/TransactionalStore.h> +#include <boost/shared_ptr.hpp> #include <string> -#include <memory> + +#include "SqlTransaction.h" namespace qpid { namespace store { @@ -41,23 +43,18 @@ class DatabaseConnection; */ class AmqpTransaction : public qpid::broker::TransactionContext { - std::auto_ptr<DatabaseConnection> db; - - // Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(), - // et al, nested transactions are carried out with direct SQL commands. - // To ensure the state of this is known, keep track of how deeply the - // transactions are nested. - unsigned int transDepth; + boost::shared_ptr<DatabaseConnection> db; + SqlTransaction sqlTrans; public: - AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db); + AmqpTransaction(const boost::shared_ptr<DatabaseConnection>& _db); virtual ~AmqpTransaction(); DatabaseConnection *dbConn() { return db.get(); } - void begin(); - void commit(); - void abort(); + void sqlBegin(); + void sqlCommit(); + void sqlAbort(); }; /** @@ -69,14 +66,18 @@ public: */ class AmqpTPCTransaction : public AmqpTransaction, public qpid::broker::TPCTransactionContext { + bool prepared; std::string xid; public: - AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db, + AmqpTPCTransaction(const boost::shared_ptr<DatabaseConnection>& db, const std::string& _xid); virtual ~AmqpTPCTransaction(); - void prepare(); + void setPrepared(void) { prepared = true; } + bool isPrepared(void) const { return prepared; } + + const std::string& getXid(void) const { return xid; } }; }}} // namespace qpid::store::ms_sql diff --git a/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp b/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp index 34edec8acd..3219ea526a 100644 --- a/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp +++ b/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp @@ -67,4 +67,25 @@ DatabaseConnection::close() conn = 0; } +std::string +DatabaseConnection::getErrors() +{ + long errCount = conn->Errors->Count; + if (errCount <= 0) + return ""; + // Collection ranges from 0 to nCount -1. + std::ostringstream messages; + ErrorPtr pErr = NULL; + for (long i = 0 ; i < errCount ; i++ ) { + if (i > 0) + messages << "\n"; + messages << "[" << i << "] "; + pErr = conn->Errors->GetItem(i); + messages << "Error " << pErr->Number << ": " + << (LPCSTR)pErr->Description; + } + messages << std::ends; + return messages.str(); +} + }}} // namespace qpid::store::ms_sql diff --git a/cpp/src/qpid/store/ms-sql/DatabaseConnection.h b/cpp/src/qpid/store/ms-sql/DatabaseConnection.h index 2b8bbffa90..785d1587c5 100644 --- a/cpp/src/qpid/store/ms-sql/DatabaseConnection.h +++ b/cpp/src/qpid/store/ms-sql/DatabaseConnection.h @@ -55,6 +55,8 @@ public: void beginTransaction() { conn->BeginTrans(); } void commitTransaction() {conn->CommitTrans(); } void rollbackTransaction() { conn->RollbackTrans(); } + + std::string getErrors(); }; }}} // namespace qpid::store::ms_sql diff --git a/cpp/src/qpid/store/ms-sql/Exception.h b/cpp/src/qpid/store/ms-sql/Exception.h index 0da4b24210..65ec3388ff 100644 --- a/cpp/src/qpid/store/ms-sql/Exception.h +++ b/cpp/src/qpid/store/ms-sql/Exception.h @@ -43,7 +43,9 @@ public: class ADOException : public Exception { public: - ADOException(const std::string& _text, _com_error &e) + ADOException(const std::string& _text, + _com_error &e, + const std::string& providerErrors = "") : Exception(_text) { text += ": "; text += e.ErrorMessage(); @@ -54,6 +56,8 @@ public: text += (const char *)wmsg; i->Release(); } + if (providerErrors.length() > 0) + text += providerErrors; } }; diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp index a26df59df7..323bc94c5c 100644 --- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp +++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp @@ -32,6 +32,7 @@ #include "BindingRecordset.h" #include "MessageMapRecordset.h" #include "MessageRecordset.h" +#include "TplRecordset.h" #include "DatabaseConnection.h" #include "Exception.h" #include "State.h" @@ -51,6 +52,7 @@ const std::string TblExchange("tblExchange"); const std::string TblMessage("tblMessage"); const std::string TblMessageMap("tblMessageMap"); const std::string TblQueue("tblQueue"); +const std::string TblTpl("tblTPL"); } namespace qpid { @@ -256,9 +258,7 @@ public: virtual void prepare(qpid::broker::TPCTransactionContext& txn); virtual void commit(qpid::broker::TransactionContext& txn); virtual void abort(qpid::broker::TransactionContext& txn); - - // @TODO This maybe should not be in TransactionalStore - virtual void collectPreparedXids(std::set<std::string>& xids) {} + virtual void collectPreparedXids(std::set<std::string>& xids); //@} virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer); @@ -272,6 +272,8 @@ public: virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, MessageMap& messageMap, MessageQueueMap& messageQueueMap); + virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap); private: struct ProviderOptions : public qpid::Options @@ -310,7 +312,7 @@ private: State *initState(); DatabaseConnection *initConnection(void); - void createDb(_ConnectionPtr conn, const std::string &name); + void createDb(DatabaseConnection *db, const std::string &name); }; static MSSqlProvider static_instance_registers_plugin; @@ -356,7 +358,7 @@ MSSqlProvider::earlyInitialize(Plugin::Target &target) // Database doesn't exist; create it QPID_LOG(notice, "MSSQL: Creating database " + options.catalogName); - createDb(conn, options.catalogName); + createDb(db.get(), options.catalogName); } else { QPID_LOG(notice, @@ -407,8 +409,9 @@ MSSqlProvider::create(PersistableQueue& queue, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error creating queue " + queue.getName(), e); + throw ADOException("Error creating queue " + queue.getName(), e, errs); } } @@ -418,14 +421,6 @@ MSSqlProvider::create(PersistableQueue& queue, void MSSqlProvider::destroy(PersistableQueue& queue) { - // MessageDeleter class for use with for_each, below. - class MessageDeleter { - BlobRecordset& msgs; - public: - explicit MessageDeleter(BlobRecordset& _msgs) : msgs(_msgs) {} - void operator()(uint64_t msgId) { msgs.remove(msgId); } - }; - DatabaseConnection *db = initConnection(); BlobRecordset rsQueues; BindingRecordset rsBindings; @@ -441,19 +436,18 @@ MSSqlProvider::destroy(PersistableQueue& queue) // under the references in the bindings table. Then remove the // message->queue entries for the queue, also because the queue can't // be deleted while there are references to it. If there are messages - // orphaned by removing the queue references, those messages can - // also be deleted. Lastly, the queue record can be removed. + // orphaned by removing the queue references, they're deleted by + // a trigger on the tblMessageMap table. Lastly, the queue record + // can be removed. rsBindings.removeForQueue(queue.getPersistenceId()); - std::vector<uint64_t> orphans; - rsMessageMaps.removeForQueue(queue.getPersistenceId(), orphans); - std::for_each(orphans.begin(), orphans.end(), - MessageDeleter(rsMessages)); + rsMessageMaps.removeForQueue(queue.getPersistenceId()); rsQueues.remove(queue); db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting queue " + queue.getName(), e); + throw ADOException("Error deleting queue " + queue.getName(), e, errs); } } @@ -473,8 +467,11 @@ MSSqlProvider::create(const PersistableExchange& exchange, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error creating exchange " + exchange.getName(), e); + throw ADOException("Error creating exchange " + exchange.getName(), + e, + errs); } } @@ -498,8 +495,11 @@ MSSqlProvider::destroy(const PersistableExchange& exchange) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting exchange " + exchange.getName(), e); + throw ADOException("Error deleting exchange " + exchange.getName(), + e, + errs); } } @@ -524,9 +524,12 @@ MSSqlProvider::bind(const PersistableExchange& exchange, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error binding exchange " + exchange.getName() + - " to queue " + queue.getName(), e); + " to queue " + queue.getName(), + e, + errs); } } @@ -551,9 +554,12 @@ MSSqlProvider::unbind(const PersistableExchange& exchange, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error unbinding exchange " + exchange.getName() + - " from queue " + queue.getName(), e); + " from queue " + queue.getName(), + e, + errs); } } @@ -572,8 +578,9 @@ MSSqlProvider::create(const PersistableConfig& config) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error creating config " + config.getName(), e); + throw ADOException("Error creating config " + config.getName(), e, errs); } } @@ -592,8 +599,9 @@ MSSqlProvider::destroy(const PersistableConfig& config) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting config " + config.getName(), e); + throw ADOException("Error deleting config " + config.getName(), e, errs); } } @@ -618,8 +626,9 @@ MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error staging message", e); + throw ADOException("Error staging message", e, errs); } } @@ -641,8 +650,9 @@ MSSqlProvider::destroy(PersistableMessage& msg) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting message", e); + throw ADOException("Error deleting message", e, errs); } } @@ -662,8 +672,9 @@ MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error appending to message", e); + throw ADOException("Error appending to message", e, errs); } } @@ -691,7 +702,8 @@ MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/, rsMessages.loadContent(msg, data, offset, length); } catch(_com_error &e) { - throw ADOException("Error loading message content", e); + std::string errs = db->getErrors(); + throw ADOException("Error loading message content", e, errs); } } @@ -714,6 +726,7 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, // this is not in the context of a transaction, then just use the thread's // DatabaseConnection with a ADO transaction. DatabaseConnection *db = 0; + std::string xid; AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); if (atxn == 0) { db = initConnection(); @@ -721,12 +734,16 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, } else { (void)initState(); // Ensure this thread is initialized + // It's a transactional enqueue; if it's TPC, grab the xid. + AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt); + if (tpcTxn) + xid = tpcTxn->getXid(); db = atxn->dbConn(); try { - atxn->begin(); + atxn->sqlBegin(); } catch(_com_error &e) { - throw ADOException("Error queuing message", e); + throw ADOException("Error queuing message", e, db->getErrors()); } } @@ -738,18 +755,19 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, rsMessages.add(msg); } rsMap.open(db, TblMessageMap); - rsMap.add(msg->getPersistenceId(), queue.getPersistenceId()); + rsMap.add(msg->getPersistenceId(), queue.getPersistenceId(), xid); if (atxn) - atxn->commit(); + atxn->sqlCommit(); else db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); if (atxn) - atxn->abort(); + atxn->sqlAbort(); else db->rollbackTransaction(); - throw ADOException("Error queuing message", e); + throw ADOException("Error queuing message", e, errs); } msg->enqueueComplete(); } @@ -773,6 +791,7 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, // this is not in the context of a transaction, then just use the thread's // DatabaseConnection with a ADO transaction. DatabaseConnection *db = 0; + std::string xid; AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); if (atxn == 0) { db = initConnection(); @@ -780,36 +799,54 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, } else { (void)initState(); // Ensure this thread is initialized + // It's a transactional dequeue; if it's TPC, grab the xid. + AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt); + if (tpcTxn) + xid = tpcTxn->getXid(); db = atxn->dbConn(); try { - atxn->begin(); + atxn->sqlBegin(); } catch(_com_error &e) { - throw ADOException("Error queuing message", e); + throw ADOException("Error queuing message", e, db->getErrors()); } } MessageMapRecordset rsMap; - MessageRecordset rsMessages; try { rsMap.open(db, TblMessageMap); - bool more = rsMap.remove(msg->getPersistenceId(), - queue.getPersistenceId()); - if (!more) { - rsMessages.open(db, TblMessage); - rsMessages.remove(msg); + // TPC dequeues are just marked pending and will actually be removed + // when the transaction commits; Single-phase dequeues are removed + // now, relying on the SQL transaction to put it back if the + // transaction doesn't commit. + if (!xid.empty()) { + rsMap.pendingRemove(msg->getPersistenceId(), + queue.getPersistenceId(), + xid); + } + else { + rsMap.remove(msg->getPersistenceId(), + queue.getPersistenceId()); } if (atxn) - atxn->commit(); + atxn->sqlCommit(); else db->commitTransaction(); } + catch(ms_sql::Exception&) { + if (atxn) + atxn->sqlAbort(); + else + db->rollbackTransaction(); + throw; + } catch(_com_error &e) { + std::string errs = db->getErrors(); if (atxn) - atxn->abort(); + atxn->sqlAbort(); else db->rollbackTransaction(); - throw ADOException("Error dequeuing message", e); + throw ADOException("Error dequeuing message", e, errs); } msg->dequeueComplete(); } @@ -827,10 +864,10 @@ MSSqlProvider::begin() // it safe and handle this just like a TPC transaction, which actually // can be prepared and committed/aborted from different threads, // making it a bad idea to try using the thread-local DatabaseConnection. - std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection); db->open(options.connectString, options.catalogName); std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db)); - tx->begin(); + tx->sqlBegin(); std::auto_ptr<qpid::broker::TransactionContext> tc(tx); return tc; } @@ -839,10 +876,24 @@ std::auto_ptr<qpid::broker::TPCTransactionContext> MSSqlProvider::begin(const std::string& xid) { (void)initState(); // Ensure this thread is initialized - std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection); db->open(options.connectString, options.catalogName); std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid)); - tx->begin(); + tx->sqlBegin(); + + TplRecordset rsTpl; + try { + tx->sqlBegin(); + rsTpl.open(db.get(), TblTpl); + rsTpl.add(xid); + tx->sqlCommit(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + tx->sqlAbort(); + throw ADOException("Error adding TPL record", e, errs); + } + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx); return tc; } @@ -850,28 +901,122 @@ MSSqlProvider::begin(const std::string& xid) void MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn) { - // The inner transactions used for the components of the TPC are done; - // nothing else to do but wait for the commit. + // Commit all the marked-up enqueue/dequeue ops and the TPL record. + // On commit/rollback the TPL will be removed and the TPL markups + // on the message map will be cleaned up as well. + (void)initState(); // Ensure this thread is initialized + AmqpTPCTransaction *atxn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (atxn == 0) + throw qpid::broker::InvalidTransactionContextException(); + try { + atxn->sqlCommit(); + } + catch(_com_error &e) { + throw ADOException("Error preparing", e, atxn->dbConn()->getErrors()); + } + atxn->setPrepared(); } void MSSqlProvider::commit(qpid::broker::TransactionContext& txn) { (void)initState(); // Ensure this thread is initialized - AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn); - if (atxn == 0) - throw qpid::broker::InvalidTransactionContextException(); - atxn->commit(); + /* + * One-phase transactions simply commit the outer SQL transaction + * that was begun on begin(). Two-phase transactions are different - + * the SQL transaction started on begin() was committed on prepare() + * so all the SQL records reflecting the enqueue/dequeue actions for + * the transaction are recorded but with xid markups on them to reflect + * that they are prepared but not committed. Now go back and remove + * the markups, deleting those marked for removal. + */ + AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (p2txn == 0) { + AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn); + if (p1txn == 0) + throw qpid::broker::InvalidTransactionContextException(); + p1txn->sqlCommit(); + return; + } + + DatabaseConnection *db(p2txn->dbConn()); + TplRecordset rsTpl; + MessageMapRecordset rsMessageMap; + try { + db->beginTransaction(); + rsTpl.open(db, TblTpl); + rsMessageMap.open(db, TblMessageMap); + rsMessageMap.commitPrepared(p2txn->getXid()); + rsTpl.remove(p2txn->getXid()); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error committing transaction", e, errs); + } } void MSSqlProvider::abort(qpid::broker::TransactionContext& txn) { (void)initState(); // Ensure this thread is initialized + /* + * One-phase and non-prepared two-phase transactions simply abort + * the outer SQL transaction that was begun on begin(). However, prepared + * two-phase transactions are different - the SQL transaction started + * on begin() was committed on prepare() so all the SQL records + * reflecting the enqueue/dequeue actions for the transaction are + * recorded but with xid markups on them to reflect that they are + * prepared but not committed. Now go back and remove the markups, + * deleting those marked for addition. + */ + AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (p2txn == 0 || !p2txn->isPrepared()) { + AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn); + if (p1txn == 0) + throw qpid::broker::InvalidTransactionContextException(); + p1txn->sqlAbort(); + return; + } + + DatabaseConnection *db(p2txn->dbConn()); + TplRecordset rsTpl; + MessageMapRecordset rsMessageMap; + try { + db->beginTransaction(); + rsTpl.open(db, TblTpl); + rsMessageMap.open(db, TblMessageMap); + rsMessageMap.abortPrepared(p2txn->getXid()); + rsTpl.remove(p2txn->getXid()); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error committing transaction", e, errs); + } + + + (void)initState(); // Ensure this thread is initialized AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn); if (atxn == 0) throw qpid::broker::InvalidTransactionContextException(); - atxn->abort(); + atxn->sqlAbort(); +} + +void +MSSqlProvider::collectPreparedXids(std::set<std::string>& xids) +{ + DatabaseConnection *db = initConnection(); + try { + TplRecordset rsTpl; + rsTpl.open(db, TblTpl); + rsTpl.recover(xids); + } + catch(_com_error &e) { + throw ADOException("Error reading TPL", e, db->getErrors()); + } } // @TODO Much of this recovery code is way too similar... refactor to @@ -977,6 +1122,40 @@ MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer, rsMessageMaps.recover(messageQueueMap); } +void +MSSqlProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap) +{ + DatabaseConnection *db = initConnection(); + std::set<std::string> xids; + try { + TplRecordset rsTpl; + rsTpl.open(db, TblTpl); + rsTpl.recover(xids); + } + catch(_com_error &e) { + throw ADOException("Error recovering TPL records", e, db->getErrors()); + } + + try { + // Rebuild the needed RecoverableTransactions. + for (std::set<std::string>::const_iterator iXid = xids.begin(); + iXid != xids.end(); + ++iXid) { + boost::shared_ptr<DatabaseConnection> dbX(new DatabaseConnection); + dbX->open(options.connectString, options.catalogName); + std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(dbX, + *iXid)); + tx->setPrepared(); + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx); + dtxMap[*iXid] = recoverer.recoverTransaction(*iXid, tc); + } + } + catch(_com_error &e) { + throw ADOException("Error recreating dtx connection", e); + } +} + ////////////// Internal Methods State * @@ -1003,7 +1182,7 @@ MSSqlProvider::initConnection(void) } void -MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) +MSSqlProvider::createDb(DatabaseConnection *db, const std::string &name) { const std::string dbCmd = "CREATE DATABASE " + name; const std::string useCmd = "USE " + name; @@ -1018,9 +1197,15 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) " fieldTableBlob varbinary(MAX))"; const std::string messageMapSpecs = " (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL," - " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL)"; + " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL," + " prepareStatus tinyint CHECK (prepareStatus IS NULL OR " + " prepareStatus IN (1, 2))," + " xid varbinary(512) REFERENCES tblTPL(xid)" + " CONSTRAINT CK_NoDups UNIQUE NONCLUSTERED (messageId, queueId) )"; + const std::string tplSpecs = " (xid varbinary(512) PRIMARY KEY NOT NULL)"; _variant_t unused; _bstr_t dbStr = dbCmd.c_str(); + _ConnectionPtr conn(*db); try { conn->Execute(dbStr, &unused, adExecuteNoRecords); _bstr_t useStr = useCmd.c_str(); @@ -1040,12 +1225,15 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) makeTable = tableCmd + TblBinding + bindingSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblTpl + tplSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); makeTable = tableCmd + TblMessageMap + messageMapSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); } catch(_com_error &e) { - throw ADOException("MSSQL can't create " + name, e); + throw ADOException("MSSQL can't create " + name, e, db->getErrors()); } } diff --git a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp index 68e174a2b0..a2ff8aefdd 100644 --- a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp +++ b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp @@ -24,96 +24,197 @@ #include <qpid/store/StorageProvider.h> #include "MessageMapRecordset.h" +#include "BlobEncoder.h" +#include "DatabaseConnection.h" +#include "Exception.h" #include "VariantHelper.h" +namespace { +inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);}; +} + namespace qpid { namespace store { namespace ms_sql { void -MessageMapRecordset::add(uint64_t messageId, uint64_t queueId) +MessageMapRecordset::open(DatabaseConnection* conn, const std::string& table) { - rs->AddNew(); - rs->Fields->GetItem("messageId")->Value = messageId; - rs->Fields->GetItem("queueId")->Value = queueId; - rs->Update(); + init(conn, table); } -bool +void +MessageMapRecordset::add(uint64_t messageId, + uint64_t queueId, + const std::string& xid) +{ + std::ostringstream command; + command << "INSERT INTO " << tableName + << " (messageId, queueId"; + if (!xid.empty()) + command << ", prepareStatus, xid"; + command << ") VALUES (" << messageId << "," << queueId; + if (!xid.empty()) + command << "," << PREPARE_ADD << ",?"; + command << ")" << std::ends; + + _CommandPtr cmd = NULL; + _ParameterPtr xidVal = NULL; + TESTHR(cmd.CreateInstance(__uuidof(Command))); + _ConnectionPtr p = *dbConn; + cmd->ActiveConnection = p; + cmd->CommandText = command.str().c_str(); + cmd->CommandType = adCmdText; + if (!xid.empty()) { + TESTHR(xidVal.CreateInstance(__uuidof(Parameter))); + xidVal->Name = "@xid"; + xidVal->Type = adVarBinary; + xidVal->Size = xid.length(); + xidVal->Direction = adParamInput; + xidVal->Value = BlobEncoder(xid); + cmd->Parameters->Append(xidVal); + } + cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords); +} + +void MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId) { - // Look up all mappings for the specified message. Then scan - // for the specified queue and keep track of whether or not the - // message exists on any queue we are not looking for a well. - std::ostringstream filter; - filter << "messageId = " << messageId << std::ends; - rs->PutFilter (VariantHelper<std::string>(filter.str())); - if (rs->RecordCount == 0) - return false; + std::ostringstream command; + command << "DELETE FROM " << tableName + << " WHERE queueId = " << queueId + << " AND messageId = " << messageId << std::ends; + _CommandPtr cmd = NULL; + TESTHR(cmd.CreateInstance(__uuidof(Command))); + _ConnectionPtr p = *dbConn; + cmd->ActiveConnection = p; + cmd->CommandText = command.str().c_str(); + cmd->CommandType = adCmdText; + _variant_t deletedRecords; + cmd->Execute(&deletedRecords, NULL, adCmdText | adExecuteNoRecords); + if ((long)deletedRecords == 0) + throw ms_sql::Exception("Message does not exist in queue mapping"); + // Trigger on deleting the mapping takes care of deleting orphaned + // message record from tblMessage. +} + +void +MessageMapRecordset::pendingRemove(uint64_t messageId, + uint64_t queueId, + const std::string& xid) +{ + // Look up the mapping for the specified message and queue. There + // should be only one because of the uniqueness constraint in the + // SQL table. Update it to reflect it's pending delete with + // the specified xid. + std::ostringstream command; + command << "UPDATE " << tableName + << " SET prepareStatus=" << PREPARE_REMOVE + << " , xid=?" + << " WHERE queueId = " << queueId + << " AND messageId = " << messageId << std::ends; + + _CommandPtr cmd = NULL; + _ParameterPtr xidVal = NULL; + TESTHR(cmd.CreateInstance(__uuidof(Command))); + TESTHR(xidVal.CreateInstance(__uuidof(Parameter))); + _ConnectionPtr p = *dbConn; + cmd->ActiveConnection = p; + cmd->CommandText = command.str().c_str(); + cmd->CommandType = adCmdText; + xidVal->Name = "@xid"; + xidVal->Type = adVarBinary; + xidVal->Size = xid.length(); + xidVal->Direction = adParamInput; + xidVal->Value = BlobEncoder(xid); + cmd->Parameters->Append(xidVal); + cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords); +} + +void +MessageMapRecordset::removeForQueue(uint64_t queueId) +{ + std::ostringstream command; + command << "DELETE FROM " << tableName + << " WHERE queueId = " << queueId << std::ends; + _CommandPtr cmd = NULL; + + TESTHR(cmd.CreateInstance(__uuidof(Command))); + _ConnectionPtr p = *dbConn; + cmd->ActiveConnection = p; + cmd->CommandText = command.str().c_str(); + cmd->CommandType = adCmdText; + cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords); +} + +void +MessageMapRecordset::commitPrepared(const std::string& xid) +{ + // Find all the records for the specified xid. Records marked as adding + // are now permanent so remove the xid and prepareStatus. Records marked + // as removing are removed entirely. + openRs(); MessageMap m; IADORecordBinding *piAdoRecordBinding; rs->QueryInterface(__uuidof(IADORecordBinding), (LPVOID *)&piAdoRecordBinding); piAdoRecordBinding->BindToRecordset(&m); - bool moreEntries = false, deleted = false; - rs->MoveFirst(); - // If the desired mapping gets deleted, and we already know there are - // other mappings for the message, don't bother finishing the scan. - while (!rs->EndOfFile && !(deleted && moreEntries)) { - if (m.queueId == queueId) { + for (; !rs->EndOfFile; rs->MoveNext()) { + if (m.xidStatus != adFldOK) + continue; + const std::string x(m.xid, m.xidLength); + if (x != xid) + continue; + if (m.prepareStatus == PREPARE_REMOVE) { rs->Delete(adAffectCurrent); - rs->Update(); - deleted = true; } else { - moreEntries = true; + _variant_t dbNull; + dbNull.ChangeType(VT_NULL); + rs->Fields->GetItem("prepareStatus")->Value = dbNull; + rs->Fields->GetItem("xid")->Value = dbNull; } - rs->MoveNext(); + rs->Update(); } piAdoRecordBinding->Release(); - rs->Filter = ""; - return moreEntries; } void -MessageMapRecordset::removeForQueue(uint64_t queueId, - std::vector<uint64_t>& orphaned) +MessageMapRecordset::abortPrepared(const std::string& xid) { - // Read all the messages queued on queueId and add them to the orphaned - // list. Then remove each one and learn if there are references to it - // from other queues. The ones without references are left in the - // orphaned list, others are removed. - std::ostringstream filter; - filter << "queueId = " << queueId << std::ends; - rs->PutFilter (VariantHelper<std::string>(filter.str())); + // Find all the records for the specified xid. Records marked as adding + // need to be removed while records marked as removing are put back to + // no xid and no prepareStatus. + openRs(); MessageMap m; IADORecordBinding *piAdoRecordBinding; rs->QueryInterface(__uuidof(IADORecordBinding), (LPVOID *)&piAdoRecordBinding); piAdoRecordBinding->BindToRecordset(&m); - while (!rs->EndOfFile) { - orphaned.push_back(m.messageId); - rs->MoveNext(); + for (; !rs->EndOfFile; rs->MoveNext()) { + if (m.xidStatus != adFldOK) + continue; + const std::string x(m.xid, m.xidLength); + if (x != xid) + continue; + if (m.prepareStatus == PREPARE_ADD) { + rs->Delete(adAffectCurrent); + } + else { + _variant_t dbNull; + dbNull.ChangeType(VT_NULL); + rs->Fields->GetItem("prepareStatus")->Value = dbNull; + rs->Fields->GetItem("xid")->Value = dbNull; + } + rs->Update(); } piAdoRecordBinding->Release(); - rs->Filter = ""; // Remove filter on queueId - rs->Requery(adOptionUnspecified); // Get the entire map again - - // Now delete all the messages on this queue; any message that still has - // references from other queue(s) is removed from orphaned. - for (std::vector<uint64_t>::iterator i = orphaned.begin(); - i != orphaned.end(); - ) { - if (remove(*i, queueId)) - i = orphaned.erase(i); // There are other refs to message *i - else - ++i; - } } void MessageMapRecordset::recover(MessageQueueMap& msgMap) { + openRs(); if (rs->BOF && rs->EndOfFile) return; // Nothing to do rs->MoveFirst(); @@ -123,7 +224,18 @@ MessageMapRecordset::recover(MessageQueueMap& msgMap) (LPVOID *)&piAdoRecordBinding); piAdoRecordBinding->BindToRecordset(&b); while (!rs->EndOfFile) { - msgMap[b.messageId].push_back(b.queueId); + qpid::store::QueueEntry entry; + entry.queueId = b.queueId; + if (b.xidStatus == adFldOK && b.xidLength > 0) { + entry.xid.assign(b.xid, b.xidLength); + entry.tplStatus = + b.prepareStatus == PREPARE_ADD ? QueueEntry::ADDING + : QueueEntry::REMOVING; + } + else { + entry.tplStatus = QueueEntry::NONE; + } + msgMap[b.messageId].push_back(entry); rs->MoveNext(); } @@ -133,6 +245,7 @@ MessageMapRecordset::recover(MessageQueueMap& msgMap) void MessageMapRecordset::dump() { + openRs(); Recordset::dump(); if (rs->EndOfFile && rs->BOF) // No records return; diff --git a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h index 475137b18b..1b0c2f073e 100644 --- a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h +++ b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h @@ -23,7 +23,6 @@ */ #include <icrsint.h> -#include <vector> #include "Recordset.h" #include <qpid/broker/RecoveryManager.h> @@ -38,32 +37,56 @@ namespace ms_sql { */ class MessageMapRecordset : public Recordset { + // These values are defined in a constraint on the tblMessageMap table. + // the prepareStatus column can only be null, 1, or 2. + enum { PREPARE_ADD=1, PREPARE_REMOVE=2 }; + class MessageMap : public CADORecordBinding { BEGIN_ADO_BINDING(MessageMap) ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE) ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE) + ADO_FIXED_LENGTH_ENTRY2(3, adTinyInt, prepareStatus, FALSE) + ADO_VARIABLE_LENGTH_ENTRY(4, adVarBinary, xid, sizeof(xid), + xidStatus, xidLength, FALSE) END_ADO_BINDING() public: uint64_t messageId; uint64_t queueId; + uint8_t prepareStatus; + char xid[512]; + int xidStatus; + uint32_t xidLength; }; + void selectOnXid(const std::string& xid); + public: + virtual void open(DatabaseConnection* conn, const std::string& table); + // Add a new mapping - void add(uint64_t messageId, uint64_t queueId); - - // Remove a specific mapping. Returns true if the message is still - // enqueued on at least one other queue; false if the message no longer - // exists on any other queues. - bool remove(uint64_t messageId, uint64_t queueId); - - // Remove mappings for all messages on a specified queue. If there are - // messages that were only on the specified queue and are, therefore, - // orphaned now, return them in the orphaned vector. The orphaned - // messages can be deleted permanently as they are not referenced on - // any other queues. - void removeForQueue(uint64_t queueId, std::vector<uint64_t>& orphaned); + void add(uint64_t messageId, + uint64_t queueId, + const std::string& xid = ""); + + // Remove a specific mapping. + void remove(uint64_t messageId, uint64_t queueId); + + // Mark the indicated message->queue entry pending removal. The entry + // for the mapping is updated to indicate pending removal with the + // specified xid. + void pendingRemove(uint64_t messageId, + uint64_t queueId, + const std::string& xid); + + // Remove mappings for all messages on a specified queue. + void removeForQueue(uint64_t queueId); + + // Commit records recorded as prepared. + void commitPrepared(const std::string& xid); + + // Abort prepared changes. + void abortPrepared(const std::string& xid); // Recover the mappings of message ID -> vector<queue ID>. void recover(MessageQueueMap& msgMap); diff --git a/cpp/src/qpid/store/ms-sql/Recordset.cpp b/cpp/src/qpid/store/ms-sql/Recordset.cpp index e1a5158c87..e706799951 100644 --- a/cpp/src/qpid/store/ms-sql/Recordset.cpp +++ b/cpp/src/qpid/store/ms-sql/Recordset.cpp @@ -35,63 +35,35 @@ namespace qpid { namespace store { namespace ms_sql { -#if 0 -Recordset::Iterator::Iterator(Recordset& _rs) : rs(_rs) -{ - rs->MoveFirst(); - setCurrent(); -} - -std::pair<uint64_t, BlobAdapter>& -Recordset::Iterator::dereference() const -{ - return const_cast<std::pair<uint64_t, BlobAdapter> >(current); -} - -void -Recordset::Iterator::increment() -{ - rs->MoveNext(); - setCurrent(); -} - -bool -Recordset::Iterator::equal(const Iterator& x) const -{ - return current.first == x.current.first; -} void -Recordset::Iterator::setCurrent() +Recordset::init(DatabaseConnection* conn, const std::string& table) { - if (!rs->EndOfFile) { - uint64_t id = rs->Fields->Item["persistenceId"]->Value; - long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize; - BlobAdapter blob(blobSize); - blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); - current = std::make_pair(id, blob); - } - else { - current.first = 0; - } + dbConn = conn; + TESTHR(rs.CreateInstance(__uuidof(::Recordset))); + tableName = table; } -#endif void -Recordset::open(DatabaseConnection* conn, const std::string& table) +Recordset::openRs() { - _ConnectionPtr p = *conn; - TESTHR(rs.CreateInstance(__uuidof(::Recordset))); // Client-side cursors needed to get access to newly added // identity column immediately. Recordsets need this to get the // persistence ID for the broker objects. rs->CursorLocation = adUseClient; - rs->Open(table.c_str(), + _ConnectionPtr p = *dbConn; + rs->Open(tableName.c_str(), _variant_t((IDispatch *)p, true), adOpenStatic, adLockOptimistic, adCmdTable); - tableName = table; +} + +void +Recordset::open(DatabaseConnection* conn, const std::string& table) +{ + init(conn, table); + openRs(); } void @@ -99,7 +71,6 @@ Recordset::close() { if (rs && rs->State == adStateOpen) rs->Close(); - rs = 0; } void diff --git a/cpp/src/qpid/store/ms-sql/Recordset.h b/cpp/src/qpid/store/ms-sql/Recordset.h index 3631838aa8..032b2bd434 100644 --- a/cpp/src/qpid/store/ms-sql/Recordset.h +++ b/cpp/src/qpid/store/ms-sql/Recordset.h @@ -51,46 +51,20 @@ protected: DatabaseConnection* dbConn; std::string tableName; + void init(DatabaseConnection* conn, const std::string& table); + void openRs(); + public: + Recordset() : rs(0), dbConn(0) {} + virtual ~Recordset() { close(); rs = 0; dbConn = 0; } -#if 0 /** - * Iterator support for walking through the recordset. - * If I need to try this again, I'd look at Recordset cloning. + * Default open() reads all records into the recordset. */ - class Iterator : public boost::iterator_facade< - Iterator, std::pair<uint64_t, BlobAdapter>, boost::random_access_traversal_tag> - { - public: - Iterator() : rs(0) { } - Iterator(Recordset& _rs); - - private: - friend class boost::iterator_core_access; - - std::pair<uint64_t, BlobAdapter>& dereference() const; - void increment(); - bool equal(const Iterator& x) const; - - _RecordsetPtr rs; - std::pair<uint64_t, BlobAdapter> current; - - void setCurrent(); - }; - - friend class Iterator; -#endif - - Recordset() : rs(0) {} - virtual ~Recordset() { close(); } - void open(DatabaseConnection* conn, const std::string& table); + virtual void open(DatabaseConnection* conn, const std::string& table); void close(); void requery(); operator _RecordsetPtr () { return rs; } -#if 0 - Iterator begin() { Iterator iter(*this); return iter; } - Iterator end() { Iterator iter; return iter; } -#endif // Dump table contents; useful for debugging. void dump(); diff --git a/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp b/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp new file mode 100644 index 0000000000..6ad7725570 --- /dev/null +++ b/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp @@ -0,0 +1,71 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SqlTransaction.h"
+#include "DatabaseConnection.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+SqlTransaction::SqlTransaction(const boost::shared_ptr<DatabaseConnection>& _db)
+ : db(_db), transDepth(0)
+{
+}
+
+SqlTransaction::~SqlTransaction()
+{
+ if (transDepth > 0)
+ this->abort();
+}
+
+void
+SqlTransaction::begin()
+{
+ _bstr_t beginCmd("BEGIN TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(beginCmd, NULL, adExecuteNoRecords);
+ ++transDepth;
+}
+
+void
+SqlTransaction::commit()
+{
+ if (transDepth > 0) {
+ _bstr_t commitCmd("COMMIT TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(commitCmd, NULL, adExecuteNoRecords);
+ --transDepth;
+ }
+}
+
+void
+SqlTransaction::abort()
+{
+ if (transDepth > 0) {
+ _bstr_t rollbackCmd("ROLLBACK TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(rollbackCmd, NULL, adExecuteNoRecords);
+ transDepth = 0;
+ }
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/cpp/src/qpid/store/ms-sql/SqlTransaction.h b/cpp/src/qpid/store/ms-sql/SqlTransaction.h new file mode 100644 index 0000000000..8b5239b786 --- /dev/null +++ b/cpp/src/qpid/store/ms-sql/SqlTransaction.h @@ -0,0 +1,67 @@ +#ifndef QPID_STORE_MSSQL_SQLTRANSACTION_H
+#define QPID_STORE_MSSQL_SQLTRANSACTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <string>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @class SqlTransaction
+ *
+ * Class representing an SQL transaction.
+ * Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+ * et al, nested transactions are carried out with direct SQL commands.
+ * To ensure the state of this is known, keep track of how deeply the
+ * transactions are nested. This is more of a safety/sanity check since
+ * AMQP doesn't provide nested transactions.
+ */
+class SqlTransaction {
+
+ boost::shared_ptr<DatabaseConnection> db;
+
+ // Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+ // et al, nested transactions are carried out with direct SQL commands.
+ // To ensure the state of this is known, keep track of how deeply the
+ // transactions are nested.
+ unsigned int transDepth;
+
+public:
+ SqlTransaction(const boost::shared_ptr<DatabaseConnection>& _db);
+ ~SqlTransaction();
+
+ DatabaseConnection *dbConn() { return db.get(); }
+
+ void begin();
+ void commit();
+ void abort();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_SQLTRANSACTION_H */
diff --git a/cpp/src/qpid/store/ms-sql/TplRecordset.cpp b/cpp/src/qpid/store/ms-sql/TplRecordset.cpp new file mode 100644 index 0000000000..1309d921a9 --- /dev/null +++ b/cpp/src/qpid/store/ms-sql/TplRecordset.cpp @@ -0,0 +1,128 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "TplRecordset.h"
+#include "BlobEncoder.h"
+#include "DatabaseConnection.h"
+#include "VariantHelper.h"
+
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+TplRecordset::open(DatabaseConnection* conn, const std::string& table)
+{
+ init(conn, table);
+ // Don't actually open until we know what to do. It's far easier and more
+ // efficient to simply do most of these TPL/xid ops in a single statement.
+}
+
+void
+TplRecordset::add(const std::string& xid)
+{
+ const std::string command =
+ "INSERT INTO " + tableName + " ( xid ) VALUES ( ? )";
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.c_str();
+ cmd->CommandType = adCmdText;
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+TplRecordset::remove(const std::string& xid)
+{
+ // Look up the item by its xid
+ const std::string command =
+ "DELETE FROM " + tableName + " WHERE xid = ?";
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.c_str();
+ cmd->CommandType = adCmdText;
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ _variant_t deletedRecords;
+ cmd->Execute(&deletedRecords, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+TplRecordset::recover(std::set<std::string>& xids)
+{
+ openRs();
+ if (rs->BOF && rs->EndOfFile)
+ return; // Nothing to do
+ rs->MoveFirst();
+ while (!rs->EndOfFile) {
+ _variant_t wxid = rs->Fields->Item["xid"]->Value;
+ char *xidBytes;
+ SafeArrayAccessData(wxid.parray, (void **)&xidBytes);
+ std::string xid(xidBytes, rs->Fields->Item["xid"]->ActualSize);
+ xids.insert(xid);
+ SafeArrayUnaccessData(wxid.parray);
+ rs->MoveNext();
+ }
+}
+
+void
+TplRecordset::dump()
+{
+ Recordset::dump();
+ if (rs->EndOfFile && rs->BOF) // No records
+ return;
+
+ rs->MoveFirst();
+ while (!rs->EndOfFile) {
+ _bstr_t wxid = rs->Fields->Item["xid"]->Value;
+ QPID_LOG(notice, " -> " << (const char *)wxid);
+ rs->MoveNext();
+ }
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/cpp/src/qpid/store/ms-sql/TplRecordset.h b/cpp/src/qpid/store/ms-sql/TplRecordset.h new file mode 100644 index 0000000000..fbde51738c --- /dev/null +++ b/cpp/src/qpid/store/ms-sql/TplRecordset.h @@ -0,0 +1,58 @@ +#ifndef QPID_STORE_MSSQL_TPLRECORDSET_H
+#define QPID_STORE_MSSQL_TPLRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Recordset.h"
+#include <string>
+#include <set>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class TplRecordset
+ *
+ * Class for the TPL (Transaction Prepared List) records.
+ */
+class TplRecordset : public Recordset {
+protected:
+
+public:
+ virtual void open(DatabaseConnection* conn, const std::string& table);
+
+ void add(const std::string& xid);
+
+ // Remove a record given its xid.
+ void remove(const std::string& xid);
+
+ // Recover prepared transaction XIDs.
+ void recover(std::set<std::string>& xids);
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_TPLRECORDSET_H */
diff --git a/cpp/src/qpid/store/ms-sql/VariantHelper.cpp b/cpp/src/qpid/store/ms-sql/VariantHelper.cpp index 786724f031..acec95c1f9 100644 --- a/cpp/src/qpid/store/ms-sql/VariantHelper.cpp +++ b/cpp/src/qpid/store/ms-sql/VariantHelper.cpp @@ -41,13 +41,25 @@ VariantHelper<Wrapped>::operator const _variant_t& () const // Specialization for using _variant_t to wrap a std::string VariantHelper<std::string>::VariantHelper(const std::string &init) { - var.SetString(init.c_str()); + if (init.empty() || init.length() == 0) { + var.vt = VT_BSTR; + var.bstrVal = NULL; + } + else { + var.SetString(init.c_str()); + } } VariantHelper<std::string>& VariantHelper<std::string>::operator=(const std::string &rhs) { - var.SetString(rhs.c_str()); + if (rhs.empty() || rhs.length() == 0) { + var.vt = VT_BSTR; + var.bstrVal = NULL; + } + else { + var.SetString(rhs.c_str()); + } return *this; } |