summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp')
-rw-r--r--cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp312
1 files changed, 250 insertions, 62 deletions
diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
index a26df59df7..323bc94c5c 100644
--- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
+++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
@@ -32,6 +32,7 @@
#include "BindingRecordset.h"
#include "MessageMapRecordset.h"
#include "MessageRecordset.h"
+#include "TplRecordset.h"
#include "DatabaseConnection.h"
#include "Exception.h"
#include "State.h"
@@ -51,6 +52,7 @@ const std::string TblExchange("tblExchange");
const std::string TblMessage("tblMessage");
const std::string TblMessageMap("tblMessageMap");
const std::string TblQueue("tblQueue");
+const std::string TblTpl("tblTPL");
}
namespace qpid {
@@ -256,9 +258,7 @@ public:
virtual void prepare(qpid::broker::TPCTransactionContext& txn);
virtual void commit(qpid::broker::TransactionContext& txn);
virtual void abort(qpid::broker::TransactionContext& txn);
-
- // @TODO This maybe should not be in TransactionalStore
- virtual void collectPreparedXids(std::set<std::string>& xids) {}
+ virtual void collectPreparedXids(std::set<std::string>& xids);
//@}
virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer);
@@ -272,6 +272,8 @@ public:
virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
MessageMap& messageMap,
MessageQueueMap& messageQueueMap);
+ virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap);
private:
struct ProviderOptions : public qpid::Options
@@ -310,7 +312,7 @@ private:
State *initState();
DatabaseConnection *initConnection(void);
- void createDb(_ConnectionPtr conn, const std::string &name);
+ void createDb(DatabaseConnection *db, const std::string &name);
};
static MSSqlProvider static_instance_registers_plugin;
@@ -356,7 +358,7 @@ MSSqlProvider::earlyInitialize(Plugin::Target &target)
// Database doesn't exist; create it
QPID_LOG(notice,
"MSSQL: Creating database " + options.catalogName);
- createDb(conn, options.catalogName);
+ createDb(db.get(), options.catalogName);
}
else {
QPID_LOG(notice,
@@ -407,8 +409,9 @@ MSSqlProvider::create(PersistableQueue& queue,
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error creating queue " + queue.getName(), e);
+ throw ADOException("Error creating queue " + queue.getName(), e, errs);
}
}
@@ -418,14 +421,6 @@ MSSqlProvider::create(PersistableQueue& queue,
void
MSSqlProvider::destroy(PersistableQueue& queue)
{
- // MessageDeleter class for use with for_each, below.
- class MessageDeleter {
- BlobRecordset& msgs;
- public:
- explicit MessageDeleter(BlobRecordset& _msgs) : msgs(_msgs) {}
- void operator()(uint64_t msgId) { msgs.remove(msgId); }
- };
-
DatabaseConnection *db = initConnection();
BlobRecordset rsQueues;
BindingRecordset rsBindings;
@@ -441,19 +436,18 @@ MSSqlProvider::destroy(PersistableQueue& queue)
// under the references in the bindings table. Then remove the
// message->queue entries for the queue, also because the queue can't
// be deleted while there are references to it. If there are messages
- // orphaned by removing the queue references, those messages can
- // also be deleted. Lastly, the queue record can be removed.
+ // orphaned by removing the queue references, they're deleted by
+ // a trigger on the tblMessageMap table. Lastly, the queue record
+ // can be removed.
rsBindings.removeForQueue(queue.getPersistenceId());
- std::vector<uint64_t> orphans;
- rsMessageMaps.removeForQueue(queue.getPersistenceId(), orphans);
- std::for_each(orphans.begin(), orphans.end(),
- MessageDeleter(rsMessages));
+ rsMessageMaps.removeForQueue(queue.getPersistenceId());
rsQueues.remove(queue);
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error deleting queue " + queue.getName(), e);
+ throw ADOException("Error deleting queue " + queue.getName(), e, errs);
}
}
@@ -473,8 +467,11 @@ MSSqlProvider::create(const PersistableExchange& exchange,
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error creating exchange " + exchange.getName(), e);
+ throw ADOException("Error creating exchange " + exchange.getName(),
+ e,
+ errs);
}
}
@@ -498,8 +495,11 @@ MSSqlProvider::destroy(const PersistableExchange& exchange)
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error deleting exchange " + exchange.getName(), e);
+ throw ADOException("Error deleting exchange " + exchange.getName(),
+ e,
+ errs);
}
}
@@ -524,9 +524,12 @@ MSSqlProvider::bind(const PersistableExchange& exchange,
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error binding exchange " + exchange.getName() +
- " to queue " + queue.getName(), e);
+ " to queue " + queue.getName(),
+ e,
+ errs);
}
}
@@ -551,9 +554,12 @@ MSSqlProvider::unbind(const PersistableExchange& exchange,
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error unbinding exchange " + exchange.getName() +
- " from queue " + queue.getName(), e);
+ " from queue " + queue.getName(),
+ e,
+ errs);
}
}
@@ -572,8 +578,9 @@ MSSqlProvider::create(const PersistableConfig& config)
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error creating config " + config.getName(), e);
+ throw ADOException("Error creating config " + config.getName(), e, errs);
}
}
@@ -592,8 +599,9 @@ MSSqlProvider::destroy(const PersistableConfig& config)
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error deleting config " + config.getName(), e);
+ throw ADOException("Error deleting config " + config.getName(), e, errs);
}
}
@@ -618,8 +626,9 @@ MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error staging message", e);
+ throw ADOException("Error staging message", e, errs);
}
}
@@ -641,8 +650,9 @@ MSSqlProvider::destroy(PersistableMessage& msg)
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error deleting message", e);
+ throw ADOException("Error deleting message", e, errs);
}
}
@@ -662,8 +672,9 @@ MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error appending to message", e);
+ throw ADOException("Error appending to message", e, errs);
}
}
@@ -691,7 +702,8 @@ MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
rsMessages.loadContent(msg, data, offset, length);
}
catch(_com_error &e) {
- throw ADOException("Error loading message content", e);
+ std::string errs = db->getErrors();
+ throw ADOException("Error loading message content", e, errs);
}
}
@@ -714,6 +726,7 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
// this is not in the context of a transaction, then just use the thread's
// DatabaseConnection with a ADO transaction.
DatabaseConnection *db = 0;
+ std::string xid;
AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
if (atxn == 0) {
db = initConnection();
@@ -721,12 +734,16 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
}
else {
(void)initState(); // Ensure this thread is initialized
+ // It's a transactional enqueue; if it's TPC, grab the xid.
+ AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt);
+ if (tpcTxn)
+ xid = tpcTxn->getXid();
db = atxn->dbConn();
try {
- atxn->begin();
+ atxn->sqlBegin();
}
catch(_com_error &e) {
- throw ADOException("Error queuing message", e);
+ throw ADOException("Error queuing message", e, db->getErrors());
}
}
@@ -738,18 +755,19 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
rsMessages.add(msg);
}
rsMap.open(db, TblMessageMap);
- rsMap.add(msg->getPersistenceId(), queue.getPersistenceId());
+ rsMap.add(msg->getPersistenceId(), queue.getPersistenceId(), xid);
if (atxn)
- atxn->commit();
+ atxn->sqlCommit();
else
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
if (atxn)
- atxn->abort();
+ atxn->sqlAbort();
else
db->rollbackTransaction();
- throw ADOException("Error queuing message", e);
+ throw ADOException("Error queuing message", e, errs);
}
msg->enqueueComplete();
}
@@ -773,6 +791,7 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt,
// this is not in the context of a transaction, then just use the thread's
// DatabaseConnection with a ADO transaction.
DatabaseConnection *db = 0;
+ std::string xid;
AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
if (atxn == 0) {
db = initConnection();
@@ -780,36 +799,54 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt,
}
else {
(void)initState(); // Ensure this thread is initialized
+ // It's a transactional dequeue; if it's TPC, grab the xid.
+ AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt);
+ if (tpcTxn)
+ xid = tpcTxn->getXid();
db = atxn->dbConn();
try {
- atxn->begin();
+ atxn->sqlBegin();
}
catch(_com_error &e) {
- throw ADOException("Error queuing message", e);
+ throw ADOException("Error queuing message", e, db->getErrors());
}
}
MessageMapRecordset rsMap;
- MessageRecordset rsMessages;
try {
rsMap.open(db, TblMessageMap);
- bool more = rsMap.remove(msg->getPersistenceId(),
- queue.getPersistenceId());
- if (!more) {
- rsMessages.open(db, TblMessage);
- rsMessages.remove(msg);
+ // TPC dequeues are just marked pending and will actually be removed
+ // when the transaction commits; Single-phase dequeues are removed
+ // now, relying on the SQL transaction to put it back if the
+ // transaction doesn't commit.
+ if (!xid.empty()) {
+ rsMap.pendingRemove(msg->getPersistenceId(),
+ queue.getPersistenceId(),
+ xid);
+ }
+ else {
+ rsMap.remove(msg->getPersistenceId(),
+ queue.getPersistenceId());
}
if (atxn)
- atxn->commit();
+ atxn->sqlCommit();
else
db->commitTransaction();
}
+ catch(ms_sql::Exception&) {
+ if (atxn)
+ atxn->sqlAbort();
+ else
+ db->rollbackTransaction();
+ throw;
+ }
catch(_com_error &e) {
+ std::string errs = db->getErrors();
if (atxn)
- atxn->abort();
+ atxn->sqlAbort();
else
db->rollbackTransaction();
- throw ADOException("Error dequeuing message", e);
+ throw ADOException("Error dequeuing message", e, errs);
}
msg->dequeueComplete();
}
@@ -827,10 +864,10 @@ MSSqlProvider::begin()
// it safe and handle this just like a TPC transaction, which actually
// can be prepared and committed/aborted from different threads,
// making it a bad idea to try using the thread-local DatabaseConnection.
- std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+ boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection);
db->open(options.connectString, options.catalogName);
std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db));
- tx->begin();
+ tx->sqlBegin();
std::auto_ptr<qpid::broker::TransactionContext> tc(tx);
return tc;
}
@@ -839,10 +876,24 @@ std::auto_ptr<qpid::broker::TPCTransactionContext>
MSSqlProvider::begin(const std::string& xid)
{
(void)initState(); // Ensure this thread is initialized
- std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+ boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection);
db->open(options.connectString, options.catalogName);
std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid));
- tx->begin();
+ tx->sqlBegin();
+
+ TplRecordset rsTpl;
+ try {
+ tx->sqlBegin();
+ rsTpl.open(db.get(), TblTpl);
+ rsTpl.add(xid);
+ tx->sqlCommit();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ tx->sqlAbort();
+ throw ADOException("Error adding TPL record", e, errs);
+ }
+
std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
return tc;
}
@@ -850,28 +901,122 @@ MSSqlProvider::begin(const std::string& xid)
void
MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn)
{
- // The inner transactions used for the components of the TPC are done;
- // nothing else to do but wait for the commit.
+ // Commit all the marked-up enqueue/dequeue ops and the TPL record.
+ // On commit/rollback the TPL will be removed and the TPL markups
+ // on the message map will be cleaned up as well.
+ (void)initState(); // Ensure this thread is initialized
+ AmqpTPCTransaction *atxn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+ if (atxn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ try {
+ atxn->sqlCommit();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error preparing", e, atxn->dbConn()->getErrors());
+ }
+ atxn->setPrepared();
}
void
MSSqlProvider::commit(qpid::broker::TransactionContext& txn)
{
(void)initState(); // Ensure this thread is initialized
- AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn);
- if (atxn == 0)
- throw qpid::broker::InvalidTransactionContextException();
- atxn->commit();
+ /*
+ * One-phase transactions simply commit the outer SQL transaction
+ * that was begun on begin(). Two-phase transactions are different -
+ * the SQL transaction started on begin() was committed on prepare()
+ * so all the SQL records reflecting the enqueue/dequeue actions for
+ * the transaction are recorded but with xid markups on them to reflect
+ * that they are prepared but not committed. Now go back and remove
+ * the markups, deleting those marked for removal.
+ */
+ AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+ if (p2txn == 0) {
+ AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn);
+ if (p1txn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ p1txn->sqlCommit();
+ return;
+ }
+
+ DatabaseConnection *db(p2txn->dbConn());
+ TplRecordset rsTpl;
+ MessageMapRecordset rsMessageMap;
+ try {
+ db->beginTransaction();
+ rsTpl.open(db, TblTpl);
+ rsMessageMap.open(db, TblMessageMap);
+ rsMessageMap.commitPrepared(p2txn->getXid());
+ rsTpl.remove(p2txn->getXid());
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error committing transaction", e, errs);
+ }
}
void
MSSqlProvider::abort(qpid::broker::TransactionContext& txn)
{
(void)initState(); // Ensure this thread is initialized
+ /*
+ * One-phase and non-prepared two-phase transactions simply abort
+ * the outer SQL transaction that was begun on begin(). However, prepared
+ * two-phase transactions are different - the SQL transaction started
+ * on begin() was committed on prepare() so all the SQL records
+ * reflecting the enqueue/dequeue actions for the transaction are
+ * recorded but with xid markups on them to reflect that they are
+ * prepared but not committed. Now go back and remove the markups,
+ * deleting those marked for addition.
+ */
+ AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+ if (p2txn == 0 || !p2txn->isPrepared()) {
+ AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn);
+ if (p1txn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ p1txn->sqlAbort();
+ return;
+ }
+
+ DatabaseConnection *db(p2txn->dbConn());
+ TplRecordset rsTpl;
+ MessageMapRecordset rsMessageMap;
+ try {
+ db->beginTransaction();
+ rsTpl.open(db, TblTpl);
+ rsMessageMap.open(db, TblMessageMap);
+ rsMessageMap.abortPrepared(p2txn->getXid());
+ rsTpl.remove(p2txn->getXid());
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error committing transaction", e, errs);
+ }
+
+
+ (void)initState(); // Ensure this thread is initialized
AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn);
if (atxn == 0)
throw qpid::broker::InvalidTransactionContextException();
- atxn->abort();
+ atxn->sqlAbort();
+}
+
+void
+MSSqlProvider::collectPreparedXids(std::set<std::string>& xids)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ TplRecordset rsTpl;
+ rsTpl.open(db, TblTpl);
+ rsTpl.recover(xids);
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error reading TPL", e, db->getErrors());
+ }
}
// @TODO Much of this recovery code is way too similar... refactor to
@@ -977,6 +1122,40 @@ MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer,
rsMessageMaps.recover(messageQueueMap);
}
+void
+MSSqlProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap)
+{
+ DatabaseConnection *db = initConnection();
+ std::set<std::string> xids;
+ try {
+ TplRecordset rsTpl;
+ rsTpl.open(db, TblTpl);
+ rsTpl.recover(xids);
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering TPL records", e, db->getErrors());
+ }
+
+ try {
+ // Rebuild the needed RecoverableTransactions.
+ for (std::set<std::string>::const_iterator iXid = xids.begin();
+ iXid != xids.end();
+ ++iXid) {
+ boost::shared_ptr<DatabaseConnection> dbX(new DatabaseConnection);
+ dbX->open(options.connectString, options.catalogName);
+ std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(dbX,
+ *iXid));
+ tx->setPrepared();
+ std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
+ dtxMap[*iXid] = recoverer.recoverTransaction(*iXid, tc);
+ }
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recreating dtx connection", e);
+ }
+}
+
////////////// Internal Methods
State *
@@ -1003,7 +1182,7 @@ MSSqlProvider::initConnection(void)
}
void
-MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name)
+MSSqlProvider::createDb(DatabaseConnection *db, const std::string &name)
{
const std::string dbCmd = "CREATE DATABASE " + name;
const std::string useCmd = "USE " + name;
@@ -1018,9 +1197,15 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name)
" fieldTableBlob varbinary(MAX))";
const std::string messageMapSpecs =
" (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL,"
- " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL)";
+ " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
+ " prepareStatus tinyint CHECK (prepareStatus IS NULL OR "
+ " prepareStatus IN (1, 2)),"
+ " xid varbinary(512) REFERENCES tblTPL(xid)"
+ " CONSTRAINT CK_NoDups UNIQUE NONCLUSTERED (messageId, queueId) )";
+ const std::string tplSpecs = " (xid varbinary(512) PRIMARY KEY NOT NULL)";
_variant_t unused;
_bstr_t dbStr = dbCmd.c_str();
+ _ConnectionPtr conn(*db);
try {
conn->Execute(dbStr, &unused, adExecuteNoRecords);
_bstr_t useStr = useCmd.c_str();
@@ -1040,12 +1225,15 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name)
makeTable = tableCmd + TblBinding + bindingSpecs;
makeTableStr = makeTable.c_str();
conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblTpl + tplSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
makeTable = tableCmd + TblMessageMap + messageMapSpecs;
makeTableStr = makeTable.c_str();
conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
}
catch(_com_error &e) {
- throw ADOException("MSSQL can't create " + name, e);
+ throw ADOException("MSSQL can't create " + name, e, db->getErrors());
}
}