diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/store/CMakeLists.txt | 2 | ||||
-rw-r--r-- | cpp/src/qpid/store/MessageStorePlugin.cpp | 26 | ||||
-rw-r--r-- | cpp/src/qpid/store/StorageProvider.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp | 44 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/AmqpTransaction.h | 29 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/DatabaseConnection.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/Exception.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp | 312 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp | 217 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MessageMapRecordset.h | 51 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/Recordset.cpp | 57 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/Recordset.h | 40 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/SqlTransaction.cpp | 71 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/SqlTransaction.h | 67 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/TplRecordset.cpp | 128 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/TplRecordset.h | 58 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/VariantHelper.cpp | 16 |
18 files changed, 899 insertions, 262 deletions
diff --git a/cpp/src/qpid/store/CMakeLists.txt b/cpp/src/qpid/store/CMakeLists.txt index 0d25923175..5c04e825d7 100644 --- a/cpp/src/qpid/store/CMakeLists.txt +++ b/cpp/src/qpid/store/CMakeLists.txt @@ -71,7 +71,9 @@ if (BUILD_MSSQL) ms-sql/MessageMapRecordset.cpp ms-sql/MessageRecordset.cpp ms-sql/Recordset.cpp + ms-sql/SqlTransaction.cpp ms-sql/State.cpp + ms-sql/TplRecordset.cpp ms-sql/VariantHelper.cpp) target_link_libraries (mssql_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY}) install (TARGETS mssql_store # RUNTIME diff --git a/cpp/src/qpid/store/MessageStorePlugin.cpp b/cpp/src/qpid/store/MessageStorePlugin.cpp index 05b6ef4465..fdb947eef2 100644 --- a/cpp/src/qpid/store/MessageStorePlugin.cpp +++ b/cpp/src/qpid/store/MessageStorePlugin.cpp @@ -405,11 +405,14 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer) QueueMap queues; MessageMap messages; MessageQueueMap messageQueueMap; + std::vector<std::string> xids; + PreparedTransactionMap dtxMap; provider->second->recoverConfigs(recoverer); provider->second->recoverExchanges(recoverer, exchanges); provider->second->recoverQueues(recoverer, queues); provider->second->recoverBindings(recoverer, exchanges, queues); + provider->second->recoverTransactions(recoverer, dtxMap); provider->second->recoverMessages(recoverer, messages, messageQueueMap); // Enqueue msgs where needed. for (MessageQueueMap::const_iterator i = messageQueueMap.begin(); @@ -426,22 +429,33 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer) broker::RecoverableMessage::shared_ptr msg = iMsg->second; // Now for each queue referenced in the queue map, locate it // and re-enqueue the message. - for (std::vector<uint64_t>::const_iterator j = i->second.begin(); + for (std::vector<QueueEntry>::const_iterator j = i->second.begin(); j != i->second.end(); ++j) { // Locate the queue corresponding to the current queue Id - QueueMap::const_iterator iQ = queues.find(*j); + QueueMap::const_iterator iQ = queues.find(j->queueId); if (iQ == queues.end()) { std::ostringstream oss; oss << "No matching queue trying to re-enqueue message " - << " on queue Id " << *j; + << " on queue Id " << j->queueId; THROW_STORE_EXCEPTION(oss.str()); } - iQ->second->recover(msg); + // Messages involved in prepared transactions have their status + // updated accordingly. First, though, restore a message that + // is expected to be on a queue, including non-transacted + // messages and those pending dequeue in a dtx. + if (j->tplStatus != QueueEntry::ADDING) + iQ->second->recover(msg); + switch(j->tplStatus) { + case QueueEntry::ADDING: + dtxMap[j->xid]->enqueue(iQ->second, msg); + break; + case QueueEntry::REMOVING: + dtxMap[j->xid]->dequeue(iQ->second, msg); + break; + } } } - - // recoverTransactions() and apply correctly while re-enqueuing } }} // namespace qpid::store diff --git a/cpp/src/qpid/store/StorageProvider.h b/cpp/src/qpid/store/StorageProvider.h index 1f257e7416..fc19f089fb 100644 --- a/cpp/src/qpid/store/StorageProvider.h +++ b/cpp/src/qpid/store/StorageProvider.h @@ -44,8 +44,16 @@ typedef std::map<uint64_t, qpid::broker::RecoverableQueue::shared_ptr> QueueMap; typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr> MessageMap; -// Msg Id -> vector of queue Ids where message is queued -typedef std::map<uint64_t, std::vector<uint64_t> > MessageQueueMap; +// Msg Id -> vector of queue entries where message is queued +struct QueueEntry { + enum TplStatus { NONE = 0, ADDING = 1, REMOVING = 2 }; + uint64_t queueId; + TplStatus tplStatus; + std::string xid; +}; +typedef std::map<uint64_t, std::vector<QueueEntry> > MessageQueueMap; +typedef std::map<std::string, qpid::broker::RecoverableTransaction::shared_ptr> + PreparedTransactionMap; class MessageStorePlugin; @@ -316,6 +324,8 @@ public: virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, MessageMap& messageMap, MessageQueueMap& messageQueueMap) = 0; + virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap) = 0; //@} }; diff --git a/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp b/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp index 0ecfacfb4b..095d1bf331 100644 --- a/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp +++ b/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp @@ -26,51 +26,37 @@ namespace qpid { namespace store { namespace ms_sql { -AmqpTransaction::AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db) - : db(_db), transDepth(0) +AmqpTransaction::AmqpTransaction(const boost::shared_ptr<DatabaseConnection>& _db) + : db(_db), sqlTrans(_db) { } AmqpTransaction::~AmqpTransaction() { - if (transDepth > 0) - this->abort(); } void -AmqpTransaction::begin() +AmqpTransaction::sqlBegin() { - _bstr_t beginCmd("BEGIN TRANSACTION"); - _ConnectionPtr c = *db; - c->Execute(beginCmd, NULL, adExecuteNoRecords); - ++transDepth; + sqlTrans.begin(); } void -AmqpTransaction::commit() +AmqpTransaction::sqlCommit() { - if (transDepth > 0) { - _bstr_t commitCmd("COMMIT TRANSACTION"); - _ConnectionPtr c = *db; - c->Execute(commitCmd, NULL, adExecuteNoRecords); - --transDepth; - } + sqlTrans.commit(); } void -AmqpTransaction::abort() +AmqpTransaction::sqlAbort() { - if (transDepth > 0) { - _bstr_t rollbackCmd("ROLLBACK TRANSACTION"); - _ConnectionPtr c = *db; - c->Execute(rollbackCmd, NULL, adExecuteNoRecords); - transDepth = 0; - } + sqlTrans.abort(); } -AmqpTPCTransaction::AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db, + +AmqpTPCTransaction::AmqpTPCTransaction(const boost::shared_ptr<DatabaseConnection>& db, const std::string& _xid) - : AmqpTransaction(_db), xid(_xid) + : AmqpTransaction(db), prepared(false), xid(_xid) { } @@ -78,12 +64,4 @@ AmqpTPCTransaction::~AmqpTPCTransaction() { } -void -AmqpTPCTransaction::prepare() -{ - // Intermediate transactions should have already assured integrity of - // the content in the database; just waiting to pull the trigger on the - // outermost transaction. -} - }}} // namespace qpid::store::ms_sql diff --git a/cpp/src/qpid/store/ms-sql/AmqpTransaction.h b/cpp/src/qpid/store/ms-sql/AmqpTransaction.h index 9b87d0ae15..625fab5595 100644 --- a/cpp/src/qpid/store/ms-sql/AmqpTransaction.h +++ b/cpp/src/qpid/store/ms-sql/AmqpTransaction.h @@ -23,8 +23,10 @@ */ #include <qpid/broker/TransactionalStore.h> +#include <boost/shared_ptr.hpp> #include <string> -#include <memory> + +#include "SqlTransaction.h" namespace qpid { namespace store { @@ -41,23 +43,18 @@ class DatabaseConnection; */ class AmqpTransaction : public qpid::broker::TransactionContext { - std::auto_ptr<DatabaseConnection> db; - - // Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(), - // et al, nested transactions are carried out with direct SQL commands. - // To ensure the state of this is known, keep track of how deeply the - // transactions are nested. - unsigned int transDepth; + boost::shared_ptr<DatabaseConnection> db; + SqlTransaction sqlTrans; public: - AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db); + AmqpTransaction(const boost::shared_ptr<DatabaseConnection>& _db); virtual ~AmqpTransaction(); DatabaseConnection *dbConn() { return db.get(); } - void begin(); - void commit(); - void abort(); + void sqlBegin(); + void sqlCommit(); + void sqlAbort(); }; /** @@ -69,14 +66,18 @@ public: */ class AmqpTPCTransaction : public AmqpTransaction, public qpid::broker::TPCTransactionContext { + bool prepared; std::string xid; public: - AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db, + AmqpTPCTransaction(const boost::shared_ptr<DatabaseConnection>& db, const std::string& _xid); virtual ~AmqpTPCTransaction(); - void prepare(); + void setPrepared(void) { prepared = true; } + bool isPrepared(void) const { return prepared; } + + const std::string& getXid(void) const { return xid; } }; }}} // namespace qpid::store::ms_sql diff --git a/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp b/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp index 34edec8acd..3219ea526a 100644 --- a/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp +++ b/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp @@ -67,4 +67,25 @@ DatabaseConnection::close() conn = 0; } +std::string +DatabaseConnection::getErrors() +{ + long errCount = conn->Errors->Count; + if (errCount <= 0) + return ""; + // Collection ranges from 0 to nCount -1. + std::ostringstream messages; + ErrorPtr pErr = NULL; + for (long i = 0 ; i < errCount ; i++ ) { + if (i > 0) + messages << "\n"; + messages << "[" << i << "] "; + pErr = conn->Errors->GetItem(i); + messages << "Error " << pErr->Number << ": " + << (LPCSTR)pErr->Description; + } + messages << std::ends; + return messages.str(); +} + }}} // namespace qpid::store::ms_sql diff --git a/cpp/src/qpid/store/ms-sql/DatabaseConnection.h b/cpp/src/qpid/store/ms-sql/DatabaseConnection.h index 2b8bbffa90..785d1587c5 100644 --- a/cpp/src/qpid/store/ms-sql/DatabaseConnection.h +++ b/cpp/src/qpid/store/ms-sql/DatabaseConnection.h @@ -55,6 +55,8 @@ public: void beginTransaction() { conn->BeginTrans(); } void commitTransaction() {conn->CommitTrans(); } void rollbackTransaction() { conn->RollbackTrans(); } + + std::string getErrors(); }; }}} // namespace qpid::store::ms_sql diff --git a/cpp/src/qpid/store/ms-sql/Exception.h b/cpp/src/qpid/store/ms-sql/Exception.h index 0da4b24210..65ec3388ff 100644 --- a/cpp/src/qpid/store/ms-sql/Exception.h +++ b/cpp/src/qpid/store/ms-sql/Exception.h @@ -43,7 +43,9 @@ public: class ADOException : public Exception { public: - ADOException(const std::string& _text, _com_error &e) + ADOException(const std::string& _text, + _com_error &e, + const std::string& providerErrors = "") : Exception(_text) { text += ": "; text += e.ErrorMessage(); @@ -54,6 +56,8 @@ public: text += (const char *)wmsg; i->Release(); } + if (providerErrors.length() > 0) + text += providerErrors; } }; diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp index a26df59df7..323bc94c5c 100644 --- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp +++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp @@ -32,6 +32,7 @@ #include "BindingRecordset.h" #include "MessageMapRecordset.h" #include "MessageRecordset.h" +#include "TplRecordset.h" #include "DatabaseConnection.h" #include "Exception.h" #include "State.h" @@ -51,6 +52,7 @@ const std::string TblExchange("tblExchange"); const std::string TblMessage("tblMessage"); const std::string TblMessageMap("tblMessageMap"); const std::string TblQueue("tblQueue"); +const std::string TblTpl("tblTPL"); } namespace qpid { @@ -256,9 +258,7 @@ public: virtual void prepare(qpid::broker::TPCTransactionContext& txn); virtual void commit(qpid::broker::TransactionContext& txn); virtual void abort(qpid::broker::TransactionContext& txn); - - // @TODO This maybe should not be in TransactionalStore - virtual void collectPreparedXids(std::set<std::string>& xids) {} + virtual void collectPreparedXids(std::set<std::string>& xids); //@} virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer); @@ -272,6 +272,8 @@ public: virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, MessageMap& messageMap, MessageQueueMap& messageQueueMap); + virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap); private: struct ProviderOptions : public qpid::Options @@ -310,7 +312,7 @@ private: State *initState(); DatabaseConnection *initConnection(void); - void createDb(_ConnectionPtr conn, const std::string &name); + void createDb(DatabaseConnection *db, const std::string &name); }; static MSSqlProvider static_instance_registers_plugin; @@ -356,7 +358,7 @@ MSSqlProvider::earlyInitialize(Plugin::Target &target) // Database doesn't exist; create it QPID_LOG(notice, "MSSQL: Creating database " + options.catalogName); - createDb(conn, options.catalogName); + createDb(db.get(), options.catalogName); } else { QPID_LOG(notice, @@ -407,8 +409,9 @@ MSSqlProvider::create(PersistableQueue& queue, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error creating queue " + queue.getName(), e); + throw ADOException("Error creating queue " + queue.getName(), e, errs); } } @@ -418,14 +421,6 @@ MSSqlProvider::create(PersistableQueue& queue, void MSSqlProvider::destroy(PersistableQueue& queue) { - // MessageDeleter class for use with for_each, below. - class MessageDeleter { - BlobRecordset& msgs; - public: - explicit MessageDeleter(BlobRecordset& _msgs) : msgs(_msgs) {} - void operator()(uint64_t msgId) { msgs.remove(msgId); } - }; - DatabaseConnection *db = initConnection(); BlobRecordset rsQueues; BindingRecordset rsBindings; @@ -441,19 +436,18 @@ MSSqlProvider::destroy(PersistableQueue& queue) // under the references in the bindings table. Then remove the // message->queue entries for the queue, also because the queue can't // be deleted while there are references to it. If there are messages - // orphaned by removing the queue references, those messages can - // also be deleted. Lastly, the queue record can be removed. + // orphaned by removing the queue references, they're deleted by + // a trigger on the tblMessageMap table. Lastly, the queue record + // can be removed. rsBindings.removeForQueue(queue.getPersistenceId()); - std::vector<uint64_t> orphans; - rsMessageMaps.removeForQueue(queue.getPersistenceId(), orphans); - std::for_each(orphans.begin(), orphans.end(), - MessageDeleter(rsMessages)); + rsMessageMaps.removeForQueue(queue.getPersistenceId()); rsQueues.remove(queue); db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting queue " + queue.getName(), e); + throw ADOException("Error deleting queue " + queue.getName(), e, errs); } } @@ -473,8 +467,11 @@ MSSqlProvider::create(const PersistableExchange& exchange, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error creating exchange " + exchange.getName(), e); + throw ADOException("Error creating exchange " + exchange.getName(), + e, + errs); } } @@ -498,8 +495,11 @@ MSSqlProvider::destroy(const PersistableExchange& exchange) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting exchange " + exchange.getName(), e); + throw ADOException("Error deleting exchange " + exchange.getName(), + e, + errs); } } @@ -524,9 +524,12 @@ MSSqlProvider::bind(const PersistableExchange& exchange, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error binding exchange " + exchange.getName() + - " to queue " + queue.getName(), e); + " to queue " + queue.getName(), + e, + errs); } } @@ -551,9 +554,12 @@ MSSqlProvider::unbind(const PersistableExchange& exchange, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error unbinding exchange " + exchange.getName() + - " from queue " + queue.getName(), e); + " from queue " + queue.getName(), + e, + errs); } } @@ -572,8 +578,9 @@ MSSqlProvider::create(const PersistableConfig& config) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error creating config " + config.getName(), e); + throw ADOException("Error creating config " + config.getName(), e, errs); } } @@ -592,8 +599,9 @@ MSSqlProvider::destroy(const PersistableConfig& config) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting config " + config.getName(), e); + throw ADOException("Error deleting config " + config.getName(), e, errs); } } @@ -618,8 +626,9 @@ MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error staging message", e); + throw ADOException("Error staging message", e, errs); } } @@ -641,8 +650,9 @@ MSSqlProvider::destroy(PersistableMessage& msg) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting message", e); + throw ADOException("Error deleting message", e, errs); } } @@ -662,8 +672,9 @@ MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error appending to message", e); + throw ADOException("Error appending to message", e, errs); } } @@ -691,7 +702,8 @@ MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/, rsMessages.loadContent(msg, data, offset, length); } catch(_com_error &e) { - throw ADOException("Error loading message content", e); + std::string errs = db->getErrors(); + throw ADOException("Error loading message content", e, errs); } } @@ -714,6 +726,7 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, // this is not in the context of a transaction, then just use the thread's // DatabaseConnection with a ADO transaction. DatabaseConnection *db = 0; + std::string xid; AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); if (atxn == 0) { db = initConnection(); @@ -721,12 +734,16 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, } else { (void)initState(); // Ensure this thread is initialized + // It's a transactional enqueue; if it's TPC, grab the xid. + AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt); + if (tpcTxn) + xid = tpcTxn->getXid(); db = atxn->dbConn(); try { - atxn->begin(); + atxn->sqlBegin(); } catch(_com_error &e) { - throw ADOException("Error queuing message", e); + throw ADOException("Error queuing message", e, db->getErrors()); } } @@ -738,18 +755,19 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, rsMessages.add(msg); } rsMap.open(db, TblMessageMap); - rsMap.add(msg->getPersistenceId(), queue.getPersistenceId()); + rsMap.add(msg->getPersistenceId(), queue.getPersistenceId(), xid); if (atxn) - atxn->commit(); + atxn->sqlCommit(); else db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); if (atxn) - atxn->abort(); + atxn->sqlAbort(); else db->rollbackTransaction(); - throw ADOException("Error queuing message", e); + throw ADOException("Error queuing message", e, errs); } msg->enqueueComplete(); } @@ -773,6 +791,7 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, // this is not in the context of a transaction, then just use the thread's // DatabaseConnection with a ADO transaction. DatabaseConnection *db = 0; + std::string xid; AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); if (atxn == 0) { db = initConnection(); @@ -780,36 +799,54 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, } else { (void)initState(); // Ensure this thread is initialized + // It's a transactional dequeue; if it's TPC, grab the xid. + AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt); + if (tpcTxn) + xid = tpcTxn->getXid(); db = atxn->dbConn(); try { - atxn->begin(); + atxn->sqlBegin(); } catch(_com_error &e) { - throw ADOException("Error queuing message", e); + throw ADOException("Error queuing message", e, db->getErrors()); } } MessageMapRecordset rsMap; - MessageRecordset rsMessages; try { rsMap.open(db, TblMessageMap); - bool more = rsMap.remove(msg->getPersistenceId(), - queue.getPersistenceId()); - if (!more) { - rsMessages.open(db, TblMessage); - rsMessages.remove(msg); + // TPC dequeues are just marked pending and will actually be removed + // when the transaction commits; Single-phase dequeues are removed + // now, relying on the SQL transaction to put it back if the + // transaction doesn't commit. + if (!xid.empty()) { + rsMap.pendingRemove(msg->getPersistenceId(), + queue.getPersistenceId(), + xid); + } + else { + rsMap.remove(msg->getPersistenceId(), + queue.getPersistenceId()); } if (atxn) - atxn->commit(); + atxn->sqlCommit(); else db->commitTransaction(); } + catch(ms_sql::Exception&) { + if (atxn) + atxn->sqlAbort(); + else + db->rollbackTransaction(); + throw; + } catch(_com_error &e) { + std::string errs = db->getErrors(); if (atxn) - atxn->abort(); + atxn->sqlAbort(); else db->rollbackTransaction(); - throw ADOException("Error dequeuing message", e); + throw ADOException("Error dequeuing message", e, errs); } msg->dequeueComplete(); } @@ -827,10 +864,10 @@ MSSqlProvider::begin() // it safe and handle this just like a TPC transaction, which actually // can be prepared and committed/aborted from different threads, // making it a bad idea to try using the thread-local DatabaseConnection. - std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection); db->open(options.connectString, options.catalogName); std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db)); - tx->begin(); + tx->sqlBegin(); std::auto_ptr<qpid::broker::TransactionContext> tc(tx); return tc; } @@ -839,10 +876,24 @@ std::auto_ptr<qpid::broker::TPCTransactionContext> MSSqlProvider::begin(const std::string& xid) { (void)initState(); // Ensure this thread is initialized - std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection); db->open(options.connectString, options.catalogName); std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid)); - tx->begin(); + tx->sqlBegin(); + + TplRecordset rsTpl; + try { + tx->sqlBegin(); + rsTpl.open(db.get(), TblTpl); + rsTpl.add(xid); + tx->sqlCommit(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + tx->sqlAbort(); + throw ADOException("Error adding TPL record", e, errs); + } + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx); return tc; } @@ -850,28 +901,122 @@ MSSqlProvider::begin(const std::string& xid) void MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn) { - // The inner transactions used for the components of the TPC are done; - // nothing else to do but wait for the commit. + // Commit all the marked-up enqueue/dequeue ops and the TPL record. + // On commit/rollback the TPL will be removed and the TPL markups + // on the message map will be cleaned up as well. + (void)initState(); // Ensure this thread is initialized + AmqpTPCTransaction *atxn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (atxn == 0) + throw qpid::broker::InvalidTransactionContextException(); + try { + atxn->sqlCommit(); + } + catch(_com_error &e) { + throw ADOException("Error preparing", e, atxn->dbConn()->getErrors()); + } + atxn->setPrepared(); } void MSSqlProvider::commit(qpid::broker::TransactionContext& txn) { (void)initState(); // Ensure this thread is initialized - AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn); - if (atxn == 0) - throw qpid::broker::InvalidTransactionContextException(); - atxn->commit(); + /* + * One-phase transactions simply commit the outer SQL transaction + * that was begun on begin(). Two-phase transactions are different - + * the SQL transaction started on begin() was committed on prepare() + * so all the SQL records reflecting the enqueue/dequeue actions for + * the transaction are recorded but with xid markups on them to reflect + * that they are prepared but not committed. Now go back and remove + * the markups, deleting those marked for removal. + */ + AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (p2txn == 0) { + AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn); + if (p1txn == 0) + throw qpid::broker::InvalidTransactionContextException(); + p1txn->sqlCommit(); + return; + } + + DatabaseConnection *db(p2txn->dbConn()); + TplRecordset rsTpl; + MessageMapRecordset rsMessageMap; + try { + db->beginTransaction(); + rsTpl.open(db, TblTpl); + rsMessageMap.open(db, TblMessageMap); + rsMessageMap.commitPrepared(p2txn->getXid()); + rsTpl.remove(p2txn->getXid()); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error committing transaction", e, errs); + } } void MSSqlProvider::abort(qpid::broker::TransactionContext& txn) { (void)initState(); // Ensure this thread is initialized + /* + * One-phase and non-prepared two-phase transactions simply abort + * the outer SQL transaction that was begun on begin(). However, prepared + * two-phase transactions are different - the SQL transaction started + * on begin() was committed on prepare() so all the SQL records + * reflecting the enqueue/dequeue actions for the transaction are + * recorded but with xid markups on them to reflect that they are + * prepared but not committed. Now go back and remove the markups, + * deleting those marked for addition. + */ + AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (p2txn == 0 || !p2txn->isPrepared()) { + AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn); + if (p1txn == 0) + throw qpid::broker::InvalidTransactionContextException(); + p1txn->sqlAbort(); + return; + } + + DatabaseConnection *db(p2txn->dbConn()); + TplRecordset rsTpl; + MessageMapRecordset rsMessageMap; + try { + db->beginTransaction(); + rsTpl.open(db, TblTpl); + rsMessageMap.open(db, TblMessageMap); + rsMessageMap.abortPrepared(p2txn->getXid()); + rsTpl.remove(p2txn->getXid()); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error committing transaction", e, errs); + } + + + (void)initState(); // Ensure this thread is initialized AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn); if (atxn == 0) throw qpid::broker::InvalidTransactionContextException(); - atxn->abort(); + atxn->sqlAbort(); +} + +void +MSSqlProvider::collectPreparedXids(std::set<std::string>& xids) +{ + DatabaseConnection *db = initConnection(); + try { + TplRecordset rsTpl; + rsTpl.open(db, TblTpl); + rsTpl.recover(xids); + } + catch(_com_error &e) { + throw ADOException("Error reading TPL", e, db->getErrors()); + } } // @TODO Much of this recovery code is way too similar... refactor to @@ -977,6 +1122,40 @@ MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer, rsMessageMaps.recover(messageQueueMap); } +void +MSSqlProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap) +{ + DatabaseConnection *db = initConnection(); + std::set<std::string> xids; + try { + TplRecordset rsTpl; + rsTpl.open(db, TblTpl); + rsTpl.recover(xids); + } + catch(_com_error &e) { + throw ADOException("Error recovering TPL records", e, db->getErrors()); + } + + try { + // Rebuild the needed RecoverableTransactions. + for (std::set<std::string>::const_iterator iXid = xids.begin(); + iXid != xids.end(); + ++iXid) { + boost::shared_ptr<DatabaseConnection> dbX(new DatabaseConnection); + dbX->open(options.connectString, options.catalogName); + std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(dbX, + *iXid)); + tx->setPrepared(); + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx); + dtxMap[*iXid] = recoverer.recoverTransaction(*iXid, tc); + } + } + catch(_com_error &e) { + throw ADOException("Error recreating dtx connection", e); + } +} + ////////////// Internal Methods State * @@ -1003,7 +1182,7 @@ MSSqlProvider::initConnection(void) } void -MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) +MSSqlProvider::createDb(DatabaseConnection *db, const std::string &name) { const std::string dbCmd = "CREATE DATABASE " + name; const std::string useCmd = "USE " + name; @@ -1018,9 +1197,15 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) " fieldTableBlob varbinary(MAX))"; const std::string messageMapSpecs = " (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL," - " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL)"; + " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL," + " prepareStatus tinyint CHECK (prepareStatus IS NULL OR " + " prepareStatus IN (1, 2))," + " xid varbinary(512) REFERENCES tblTPL(xid)" + " CONSTRAINT CK_NoDups UNIQUE NONCLUSTERED (messageId, queueId) )"; + const std::string tplSpecs = " (xid varbinary(512) PRIMARY KEY NOT NULL)"; _variant_t unused; _bstr_t dbStr = dbCmd.c_str(); + _ConnectionPtr conn(*db); try { conn->Execute(dbStr, &unused, adExecuteNoRecords); _bstr_t useStr = useCmd.c_str(); @@ -1040,12 +1225,15 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) makeTable = tableCmd + TblBinding + bindingSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblTpl + tplSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); makeTable = tableCmd + TblMessageMap + messageMapSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); } catch(_com_error &e) { - throw ADOException("MSSQL can't create " + name, e); + throw ADOException("MSSQL can't create " + name, e, db->getErrors()); } } diff --git a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp index 68e174a2b0..a2ff8aefdd 100644 --- a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp +++ b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp @@ -24,96 +24,197 @@ #include <qpid/store/StorageProvider.h> #include "MessageMapRecordset.h" +#include "BlobEncoder.h" +#include "DatabaseConnection.h" +#include "Exception.h" #include "VariantHelper.h" +namespace { +inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);}; +} + namespace qpid { namespace store { namespace ms_sql { void -MessageMapRecordset::add(uint64_t messageId, uint64_t queueId) +MessageMapRecordset::open(DatabaseConnection* conn, const std::string& table) { - rs->AddNew(); - rs->Fields->GetItem("messageId")->Value = messageId; - rs->Fields->GetItem("queueId")->Value = queueId; - rs->Update(); + init(conn, table); } -bool +void +MessageMapRecordset::add(uint64_t messageId, + uint64_t queueId, + const std::string& xid) +{ + std::ostringstream command; + command << "INSERT INTO " << tableName + << " (messageId, queueId"; + if (!xid.empty()) + command << ", prepareStatus, xid"; + command << ") VALUES (" << messageId << "," << queueId; + if (!xid.empty()) + command << "," << PREPARE_ADD << ",?"; + command << ")" << std::ends; + + _CommandPtr cmd = NULL; + _ParameterPtr xidVal = NULL; + TESTHR(cmd.CreateInstance(__uuidof(Command))); + _ConnectionPtr p = *dbConn; + cmd->ActiveConnection = p; + cmd->CommandText = command.str().c_str(); + cmd->CommandType = adCmdText; + if (!xid.empty()) { + TESTHR(xidVal.CreateInstance(__uuidof(Parameter))); + xidVal->Name = "@xid"; + xidVal->Type = adVarBinary; + xidVal->Size = xid.length(); + xidVal->Direction = adParamInput; + xidVal->Value = BlobEncoder(xid); + cmd->Parameters->Append(xidVal); + } + cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords); +} + +void MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId) { - // Look up all mappings for the specified message. Then scan - // for the specified queue and keep track of whether or not the - // message exists on any queue we are not looking for a well. - std::ostringstream filter; - filter << "messageId = " << messageId << std::ends; - rs->PutFilter (VariantHelper<std::string>(filter.str())); - if (rs->RecordCount == 0) - return false; + std::ostringstream command; + command << "DELETE FROM " << tableName + << " WHERE queueId = " << queueId + << " AND messageId = " << messageId << std::ends; + _CommandPtr cmd = NULL; + TESTHR(cmd.CreateInstance(__uuidof(Command))); + _ConnectionPtr p = *dbConn; + cmd->ActiveConnection = p; + cmd->CommandText = command.str().c_str(); + cmd->CommandType = adCmdText; + _variant_t deletedRecords; + cmd->Execute(&deletedRecords, NULL, adCmdText | adExecuteNoRecords); + if ((long)deletedRecords == 0) + throw ms_sql::Exception("Message does not exist in queue mapping"); + // Trigger on deleting the mapping takes care of deleting orphaned + // message record from tblMessage. +} + +void +MessageMapRecordset::pendingRemove(uint64_t messageId, + uint64_t queueId, + const std::string& xid) +{ + // Look up the mapping for the specified message and queue. There + // should be only one because of the uniqueness constraint in the + // SQL table. Update it to reflect it's pending delete with + // the specified xid. + std::ostringstream command; + command << "UPDATE " << tableName + << " SET prepareStatus=" << PREPARE_REMOVE + << " , xid=?" + << " WHERE queueId = " << queueId + << " AND messageId = " << messageId << std::ends; + + _CommandPtr cmd = NULL; + _ParameterPtr xidVal = NULL; + TESTHR(cmd.CreateInstance(__uuidof(Command))); + TESTHR(xidVal.CreateInstance(__uuidof(Parameter))); + _ConnectionPtr p = *dbConn; + cmd->ActiveConnection = p; + cmd->CommandText = command.str().c_str(); + cmd->CommandType = adCmdText; + xidVal->Name = "@xid"; + xidVal->Type = adVarBinary; + xidVal->Size = xid.length(); + xidVal->Direction = adParamInput; + xidVal->Value = BlobEncoder(xid); + cmd->Parameters->Append(xidVal); + cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords); +} + +void +MessageMapRecordset::removeForQueue(uint64_t queueId) +{ + std::ostringstream command; + command << "DELETE FROM " << tableName + << " WHERE queueId = " << queueId << std::ends; + _CommandPtr cmd = NULL; + + TESTHR(cmd.CreateInstance(__uuidof(Command))); + _ConnectionPtr p = *dbConn; + cmd->ActiveConnection = p; + cmd->CommandText = command.str().c_str(); + cmd->CommandType = adCmdText; + cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords); +} + +void +MessageMapRecordset::commitPrepared(const std::string& xid) +{ + // Find all the records for the specified xid. Records marked as adding + // are now permanent so remove the xid and prepareStatus. Records marked + // as removing are removed entirely. + openRs(); MessageMap m; IADORecordBinding *piAdoRecordBinding; rs->QueryInterface(__uuidof(IADORecordBinding), (LPVOID *)&piAdoRecordBinding); piAdoRecordBinding->BindToRecordset(&m); - bool moreEntries = false, deleted = false; - rs->MoveFirst(); - // If the desired mapping gets deleted, and we already know there are - // other mappings for the message, don't bother finishing the scan. - while (!rs->EndOfFile && !(deleted && moreEntries)) { - if (m.queueId == queueId) { + for (; !rs->EndOfFile; rs->MoveNext()) { + if (m.xidStatus != adFldOK) + continue; + const std::string x(m.xid, m.xidLength); + if (x != xid) + continue; + if (m.prepareStatus == PREPARE_REMOVE) { rs->Delete(adAffectCurrent); - rs->Update(); - deleted = true; } else { - moreEntries = true; + _variant_t dbNull; + dbNull.ChangeType(VT_NULL); + rs->Fields->GetItem("prepareStatus")->Value = dbNull; + rs->Fields->GetItem("xid")->Value = dbNull; } - rs->MoveNext(); + rs->Update(); } piAdoRecordBinding->Release(); - rs->Filter = ""; - return moreEntries; } void -MessageMapRecordset::removeForQueue(uint64_t queueId, - std::vector<uint64_t>& orphaned) +MessageMapRecordset::abortPrepared(const std::string& xid) { - // Read all the messages queued on queueId and add them to the orphaned - // list. Then remove each one and learn if there are references to it - // from other queues. The ones without references are left in the - // orphaned list, others are removed. - std::ostringstream filter; - filter << "queueId = " << queueId << std::ends; - rs->PutFilter (VariantHelper<std::string>(filter.str())); + // Find all the records for the specified xid. Records marked as adding + // need to be removed while records marked as removing are put back to + // no xid and no prepareStatus. + openRs(); MessageMap m; IADORecordBinding *piAdoRecordBinding; rs->QueryInterface(__uuidof(IADORecordBinding), (LPVOID *)&piAdoRecordBinding); piAdoRecordBinding->BindToRecordset(&m); - while (!rs->EndOfFile) { - orphaned.push_back(m.messageId); - rs->MoveNext(); + for (; !rs->EndOfFile; rs->MoveNext()) { + if (m.xidStatus != adFldOK) + continue; + const std::string x(m.xid, m.xidLength); + if (x != xid) + continue; + if (m.prepareStatus == PREPARE_ADD) { + rs->Delete(adAffectCurrent); + } + else { + _variant_t dbNull; + dbNull.ChangeType(VT_NULL); + rs->Fields->GetItem("prepareStatus")->Value = dbNull; + rs->Fields->GetItem("xid")->Value = dbNull; + } + rs->Update(); } piAdoRecordBinding->Release(); - rs->Filter = ""; // Remove filter on queueId - rs->Requery(adOptionUnspecified); // Get the entire map again - - // Now delete all the messages on this queue; any message that still has - // references from other queue(s) is removed from orphaned. - for (std::vector<uint64_t>::iterator i = orphaned.begin(); - i != orphaned.end(); - ) { - if (remove(*i, queueId)) - i = orphaned.erase(i); // There are other refs to message *i - else - ++i; - } } void MessageMapRecordset::recover(MessageQueueMap& msgMap) { + openRs(); if (rs->BOF && rs->EndOfFile) return; // Nothing to do rs->MoveFirst(); @@ -123,7 +224,18 @@ MessageMapRecordset::recover(MessageQueueMap& msgMap) (LPVOID *)&piAdoRecordBinding); piAdoRecordBinding->BindToRecordset(&b); while (!rs->EndOfFile) { - msgMap[b.messageId].push_back(b.queueId); + qpid::store::QueueEntry entry; + entry.queueId = b.queueId; + if (b.xidStatus == adFldOK && b.xidLength > 0) { + entry.xid.assign(b.xid, b.xidLength); + entry.tplStatus = + b.prepareStatus == PREPARE_ADD ? QueueEntry::ADDING + : QueueEntry::REMOVING; + } + else { + entry.tplStatus = QueueEntry::NONE; + } + msgMap[b.messageId].push_back(entry); rs->MoveNext(); } @@ -133,6 +245,7 @@ MessageMapRecordset::recover(MessageQueueMap& msgMap) void MessageMapRecordset::dump() { + openRs(); Recordset::dump(); if (rs->EndOfFile && rs->BOF) // No records return; diff --git a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h index 475137b18b..1b0c2f073e 100644 --- a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h +++ b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h @@ -23,7 +23,6 @@ */ #include <icrsint.h> -#include <vector> #include "Recordset.h" #include <qpid/broker/RecoveryManager.h> @@ -38,32 +37,56 @@ namespace ms_sql { */ class MessageMapRecordset : public Recordset { + // These values are defined in a constraint on the tblMessageMap table. + // the prepareStatus column can only be null, 1, or 2. + enum { PREPARE_ADD=1, PREPARE_REMOVE=2 }; + class MessageMap : public CADORecordBinding { BEGIN_ADO_BINDING(MessageMap) ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE) ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE) + ADO_FIXED_LENGTH_ENTRY2(3, adTinyInt, prepareStatus, FALSE) + ADO_VARIABLE_LENGTH_ENTRY(4, adVarBinary, xid, sizeof(xid), + xidStatus, xidLength, FALSE) END_ADO_BINDING() public: uint64_t messageId; uint64_t queueId; + uint8_t prepareStatus; + char xid[512]; + int xidStatus; + uint32_t xidLength; }; + void selectOnXid(const std::string& xid); + public: + virtual void open(DatabaseConnection* conn, const std::string& table); + // Add a new mapping - void add(uint64_t messageId, uint64_t queueId); - - // Remove a specific mapping. Returns true if the message is still - // enqueued on at least one other queue; false if the message no longer - // exists on any other queues. - bool remove(uint64_t messageId, uint64_t queueId); - - // Remove mappings for all messages on a specified queue. If there are - // messages that were only on the specified queue and are, therefore, - // orphaned now, return them in the orphaned vector. The orphaned - // messages can be deleted permanently as they are not referenced on - // any other queues. - void removeForQueue(uint64_t queueId, std::vector<uint64_t>& orphaned); + void add(uint64_t messageId, + uint64_t queueId, + const std::string& xid = ""); + + // Remove a specific mapping. + void remove(uint64_t messageId, uint64_t queueId); + + // Mark the indicated message->queue entry pending removal. The entry + // for the mapping is updated to indicate pending removal with the + // specified xid. + void pendingRemove(uint64_t messageId, + uint64_t queueId, + const std::string& xid); + + // Remove mappings for all messages on a specified queue. + void removeForQueue(uint64_t queueId); + + // Commit records recorded as prepared. + void commitPrepared(const std::string& xid); + + // Abort prepared changes. + void abortPrepared(const std::string& xid); // Recover the mappings of message ID -> vector<queue ID>. void recover(MessageQueueMap& msgMap); diff --git a/cpp/src/qpid/store/ms-sql/Recordset.cpp b/cpp/src/qpid/store/ms-sql/Recordset.cpp index e1a5158c87..e706799951 100644 --- a/cpp/src/qpid/store/ms-sql/Recordset.cpp +++ b/cpp/src/qpid/store/ms-sql/Recordset.cpp @@ -35,63 +35,35 @@ namespace qpid { namespace store { namespace ms_sql { -#if 0 -Recordset::Iterator::Iterator(Recordset& _rs) : rs(_rs) -{ - rs->MoveFirst(); - setCurrent(); -} - -std::pair<uint64_t, BlobAdapter>& -Recordset::Iterator::dereference() const -{ - return const_cast<std::pair<uint64_t, BlobAdapter> >(current); -} - -void -Recordset::Iterator::increment() -{ - rs->MoveNext(); - setCurrent(); -} - -bool -Recordset::Iterator::equal(const Iterator& x) const -{ - return current.first == x.current.first; -} void -Recordset::Iterator::setCurrent() +Recordset::init(DatabaseConnection* conn, const std::string& table) { - if (!rs->EndOfFile) { - uint64_t id = rs->Fields->Item["persistenceId"]->Value; - long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize; - BlobAdapter blob(blobSize); - blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); - current = std::make_pair(id, blob); - } - else { - current.first = 0; - } + dbConn = conn; + TESTHR(rs.CreateInstance(__uuidof(::Recordset))); + tableName = table; } -#endif void -Recordset::open(DatabaseConnection* conn, const std::string& table) +Recordset::openRs() { - _ConnectionPtr p = *conn; - TESTHR(rs.CreateInstance(__uuidof(::Recordset))); // Client-side cursors needed to get access to newly added // identity column immediately. Recordsets need this to get the // persistence ID for the broker objects. rs->CursorLocation = adUseClient; - rs->Open(table.c_str(), + _ConnectionPtr p = *dbConn; + rs->Open(tableName.c_str(), _variant_t((IDispatch *)p, true), adOpenStatic, adLockOptimistic, adCmdTable); - tableName = table; +} + +void +Recordset::open(DatabaseConnection* conn, const std::string& table) +{ + init(conn, table); + openRs(); } void @@ -99,7 +71,6 @@ Recordset::close() { if (rs && rs->State == adStateOpen) rs->Close(); - rs = 0; } void diff --git a/cpp/src/qpid/store/ms-sql/Recordset.h b/cpp/src/qpid/store/ms-sql/Recordset.h index 3631838aa8..032b2bd434 100644 --- a/cpp/src/qpid/store/ms-sql/Recordset.h +++ b/cpp/src/qpid/store/ms-sql/Recordset.h @@ -51,46 +51,20 @@ protected: DatabaseConnection* dbConn; std::string tableName; + void init(DatabaseConnection* conn, const std::string& table); + void openRs(); + public: + Recordset() : rs(0), dbConn(0) {} + virtual ~Recordset() { close(); rs = 0; dbConn = 0; } -#if 0 /** - * Iterator support for walking through the recordset. - * If I need to try this again, I'd look at Recordset cloning. + * Default open() reads all records into the recordset. */ - class Iterator : public boost::iterator_facade< - Iterator, std::pair<uint64_t, BlobAdapter>, boost::random_access_traversal_tag> - { - public: - Iterator() : rs(0) { } - Iterator(Recordset& _rs); - - private: - friend class boost::iterator_core_access; - - std::pair<uint64_t, BlobAdapter>& dereference() const; - void increment(); - bool equal(const Iterator& x) const; - - _RecordsetPtr rs; - std::pair<uint64_t, BlobAdapter> current; - - void setCurrent(); - }; - - friend class Iterator; -#endif - - Recordset() : rs(0) {} - virtual ~Recordset() { close(); } - void open(DatabaseConnection* conn, const std::string& table); + virtual void open(DatabaseConnection* conn, const std::string& table); void close(); void requery(); operator _RecordsetPtr () { return rs; } -#if 0 - Iterator begin() { Iterator iter(*this); return iter; } - Iterator end() { Iterator iter; return iter; } -#endif // Dump table contents; useful for debugging. void dump(); diff --git a/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp b/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp new file mode 100644 index 0000000000..6ad7725570 --- /dev/null +++ b/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp @@ -0,0 +1,71 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SqlTransaction.h"
+#include "DatabaseConnection.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+SqlTransaction::SqlTransaction(const boost::shared_ptr<DatabaseConnection>& _db)
+ : db(_db), transDepth(0)
+{
+}
+
+SqlTransaction::~SqlTransaction()
+{
+ if (transDepth > 0)
+ this->abort();
+}
+
+void
+SqlTransaction::begin()
+{
+ _bstr_t beginCmd("BEGIN TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(beginCmd, NULL, adExecuteNoRecords);
+ ++transDepth;
+}
+
+void
+SqlTransaction::commit()
+{
+ if (transDepth > 0) {
+ _bstr_t commitCmd("COMMIT TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(commitCmd, NULL, adExecuteNoRecords);
+ --transDepth;
+ }
+}
+
+void
+SqlTransaction::abort()
+{
+ if (transDepth > 0) {
+ _bstr_t rollbackCmd("ROLLBACK TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(rollbackCmd, NULL, adExecuteNoRecords);
+ transDepth = 0;
+ }
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/cpp/src/qpid/store/ms-sql/SqlTransaction.h b/cpp/src/qpid/store/ms-sql/SqlTransaction.h new file mode 100644 index 0000000000..8b5239b786 --- /dev/null +++ b/cpp/src/qpid/store/ms-sql/SqlTransaction.h @@ -0,0 +1,67 @@ +#ifndef QPID_STORE_MSSQL_SQLTRANSACTION_H
+#define QPID_STORE_MSSQL_SQLTRANSACTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <string>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @class SqlTransaction
+ *
+ * Class representing an SQL transaction.
+ * Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+ * et al, nested transactions are carried out with direct SQL commands.
+ * To ensure the state of this is known, keep track of how deeply the
+ * transactions are nested. This is more of a safety/sanity check since
+ * AMQP doesn't provide nested transactions.
+ */
+class SqlTransaction {
+
+ boost::shared_ptr<DatabaseConnection> db;
+
+ // Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+ // et al, nested transactions are carried out with direct SQL commands.
+ // To ensure the state of this is known, keep track of how deeply the
+ // transactions are nested.
+ unsigned int transDepth;
+
+public:
+ SqlTransaction(const boost::shared_ptr<DatabaseConnection>& _db);
+ ~SqlTransaction();
+
+ DatabaseConnection *dbConn() { return db.get(); }
+
+ void begin();
+ void commit();
+ void abort();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_SQLTRANSACTION_H */
diff --git a/cpp/src/qpid/store/ms-sql/TplRecordset.cpp b/cpp/src/qpid/store/ms-sql/TplRecordset.cpp new file mode 100644 index 0000000000..1309d921a9 --- /dev/null +++ b/cpp/src/qpid/store/ms-sql/TplRecordset.cpp @@ -0,0 +1,128 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "TplRecordset.h"
+#include "BlobEncoder.h"
+#include "DatabaseConnection.h"
+#include "VariantHelper.h"
+
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+TplRecordset::open(DatabaseConnection* conn, const std::string& table)
+{
+ init(conn, table);
+ // Don't actually open until we know what to do. It's far easier and more
+ // efficient to simply do most of these TPL/xid ops in a single statement.
+}
+
+void
+TplRecordset::add(const std::string& xid)
+{
+ const std::string command =
+ "INSERT INTO " + tableName + " ( xid ) VALUES ( ? )";
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.c_str();
+ cmd->CommandType = adCmdText;
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+TplRecordset::remove(const std::string& xid)
+{
+ // Look up the item by its xid
+ const std::string command =
+ "DELETE FROM " + tableName + " WHERE xid = ?";
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.c_str();
+ cmd->CommandType = adCmdText;
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ _variant_t deletedRecords;
+ cmd->Execute(&deletedRecords, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+TplRecordset::recover(std::set<std::string>& xids)
+{
+ openRs();
+ if (rs->BOF && rs->EndOfFile)
+ return; // Nothing to do
+ rs->MoveFirst();
+ while (!rs->EndOfFile) {
+ _variant_t wxid = rs->Fields->Item["xid"]->Value;
+ char *xidBytes;
+ SafeArrayAccessData(wxid.parray, (void **)&xidBytes);
+ std::string xid(xidBytes, rs->Fields->Item["xid"]->ActualSize);
+ xids.insert(xid);
+ SafeArrayUnaccessData(wxid.parray);
+ rs->MoveNext();
+ }
+}
+
+void
+TplRecordset::dump()
+{
+ Recordset::dump();
+ if (rs->EndOfFile && rs->BOF) // No records
+ return;
+
+ rs->MoveFirst();
+ while (!rs->EndOfFile) {
+ _bstr_t wxid = rs->Fields->Item["xid"]->Value;
+ QPID_LOG(notice, " -> " << (const char *)wxid);
+ rs->MoveNext();
+ }
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/cpp/src/qpid/store/ms-sql/TplRecordset.h b/cpp/src/qpid/store/ms-sql/TplRecordset.h new file mode 100644 index 0000000000..fbde51738c --- /dev/null +++ b/cpp/src/qpid/store/ms-sql/TplRecordset.h @@ -0,0 +1,58 @@ +#ifndef QPID_STORE_MSSQL_TPLRECORDSET_H
+#define QPID_STORE_MSSQL_TPLRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Recordset.h"
+#include <string>
+#include <set>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class TplRecordset
+ *
+ * Class for the TPL (Transaction Prepared List) records.
+ */
+class TplRecordset : public Recordset {
+protected:
+
+public:
+ virtual void open(DatabaseConnection* conn, const std::string& table);
+
+ void add(const std::string& xid);
+
+ // Remove a record given its xid.
+ void remove(const std::string& xid);
+
+ // Recover prepared transaction XIDs.
+ void recover(std::set<std::string>& xids);
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_TPLRECORDSET_H */
diff --git a/cpp/src/qpid/store/ms-sql/VariantHelper.cpp b/cpp/src/qpid/store/ms-sql/VariantHelper.cpp index 786724f031..acec95c1f9 100644 --- a/cpp/src/qpid/store/ms-sql/VariantHelper.cpp +++ b/cpp/src/qpid/store/ms-sql/VariantHelper.cpp @@ -41,13 +41,25 @@ VariantHelper<Wrapped>::operator const _variant_t& () const // Specialization for using _variant_t to wrap a std::string VariantHelper<std::string>::VariantHelper(const std::string &init) { - var.SetString(init.c_str()); + if (init.empty() || init.length() == 0) { + var.vt = VT_BSTR; + var.bstrVal = NULL; + } + else { + var.SetString(init.c_str()); + } } VariantHelper<std::string>& VariantHelper<std::string>::operator=(const std::string &rhs) { - var.SetString(rhs.c_str()); + if (rhs.empty() || rhs.length() == 0) { + var.vt = VT_BSTR; + var.bstrVal = NULL; + } + else { + var.SetString(rhs.c_str()); + } return *this; } |