summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/store/CMakeLists.txt2
-rw-r--r--cpp/src/qpid/store/MessageStorePlugin.cpp26
-rw-r--r--cpp/src/qpid/store/StorageProvider.h14
-rw-r--r--cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp44
-rw-r--r--cpp/src/qpid/store/ms-sql/AmqpTransaction.h29
-rw-r--r--cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp21
-rw-r--r--cpp/src/qpid/store/ms-sql/DatabaseConnection.h2
-rw-r--r--cpp/src/qpid/store/ms-sql/Exception.h6
-rw-r--r--cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp312
-rw-r--r--cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp217
-rw-r--r--cpp/src/qpid/store/ms-sql/MessageMapRecordset.h51
-rw-r--r--cpp/src/qpid/store/ms-sql/Recordset.cpp57
-rw-r--r--cpp/src/qpid/store/ms-sql/Recordset.h40
-rw-r--r--cpp/src/qpid/store/ms-sql/SqlTransaction.cpp71
-rw-r--r--cpp/src/qpid/store/ms-sql/SqlTransaction.h67
-rw-r--r--cpp/src/qpid/store/ms-sql/TplRecordset.cpp128
-rw-r--r--cpp/src/qpid/store/ms-sql/TplRecordset.h58
-rw-r--r--cpp/src/qpid/store/ms-sql/VariantHelper.cpp16
18 files changed, 899 insertions, 262 deletions
diff --git a/cpp/src/qpid/store/CMakeLists.txt b/cpp/src/qpid/store/CMakeLists.txt
index 0d25923175..5c04e825d7 100644
--- a/cpp/src/qpid/store/CMakeLists.txt
+++ b/cpp/src/qpid/store/CMakeLists.txt
@@ -71,7 +71,9 @@ if (BUILD_MSSQL)
ms-sql/MessageMapRecordset.cpp
ms-sql/MessageRecordset.cpp
ms-sql/Recordset.cpp
+ ms-sql/SqlTransaction.cpp
ms-sql/State.cpp
+ ms-sql/TplRecordset.cpp
ms-sql/VariantHelper.cpp)
target_link_libraries (mssql_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
install (TARGETS mssql_store # RUNTIME
diff --git a/cpp/src/qpid/store/MessageStorePlugin.cpp b/cpp/src/qpid/store/MessageStorePlugin.cpp
index 05b6ef4465..fdb947eef2 100644
--- a/cpp/src/qpid/store/MessageStorePlugin.cpp
+++ b/cpp/src/qpid/store/MessageStorePlugin.cpp
@@ -405,11 +405,14 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
QueueMap queues;
MessageMap messages;
MessageQueueMap messageQueueMap;
+ std::vector<std::string> xids;
+ PreparedTransactionMap dtxMap;
provider->second->recoverConfigs(recoverer);
provider->second->recoverExchanges(recoverer, exchanges);
provider->second->recoverQueues(recoverer, queues);
provider->second->recoverBindings(recoverer, exchanges, queues);
+ provider->second->recoverTransactions(recoverer, dtxMap);
provider->second->recoverMessages(recoverer, messages, messageQueueMap);
// Enqueue msgs where needed.
for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
@@ -426,22 +429,33 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
broker::RecoverableMessage::shared_ptr msg = iMsg->second;
// Now for each queue referenced in the queue map, locate it
// and re-enqueue the message.
- for (std::vector<uint64_t>::const_iterator j = i->second.begin();
+ for (std::vector<QueueEntry>::const_iterator j = i->second.begin();
j != i->second.end();
++j) {
// Locate the queue corresponding to the current queue Id
- QueueMap::const_iterator iQ = queues.find(*j);
+ QueueMap::const_iterator iQ = queues.find(j->queueId);
if (iQ == queues.end()) {
std::ostringstream oss;
oss << "No matching queue trying to re-enqueue message "
- << " on queue Id " << *j;
+ << " on queue Id " << j->queueId;
THROW_STORE_EXCEPTION(oss.str());
}
- iQ->second->recover(msg);
+ // Messages involved in prepared transactions have their status
+ // updated accordingly. First, though, restore a message that
+ // is expected to be on a queue, including non-transacted
+ // messages and those pending dequeue in a dtx.
+ if (j->tplStatus != QueueEntry::ADDING)
+ iQ->second->recover(msg);
+ switch(j->tplStatus) {
+ case QueueEntry::ADDING:
+ dtxMap[j->xid]->enqueue(iQ->second, msg);
+ break;
+ case QueueEntry::REMOVING:
+ dtxMap[j->xid]->dequeue(iQ->second, msg);
+ break;
+ }
}
}
-
- // recoverTransactions() and apply correctly while re-enqueuing
}
}} // namespace qpid::store
diff --git a/cpp/src/qpid/store/StorageProvider.h b/cpp/src/qpid/store/StorageProvider.h
index 1f257e7416..fc19f089fb 100644
--- a/cpp/src/qpid/store/StorageProvider.h
+++ b/cpp/src/qpid/store/StorageProvider.h
@@ -44,8 +44,16 @@ typedef std::map<uint64_t, qpid::broker::RecoverableQueue::shared_ptr>
QueueMap;
typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr>
MessageMap;
-// Msg Id -> vector of queue Ids where message is queued
-typedef std::map<uint64_t, std::vector<uint64_t> > MessageQueueMap;
+// Msg Id -> vector of queue entries where message is queued
+struct QueueEntry {
+ enum TplStatus { NONE = 0, ADDING = 1, REMOVING = 2 };
+ uint64_t queueId;
+ TplStatus tplStatus;
+ std::string xid;
+};
+typedef std::map<uint64_t, std::vector<QueueEntry> > MessageQueueMap;
+typedef std::map<std::string, qpid::broker::RecoverableTransaction::shared_ptr>
+ PreparedTransactionMap;
class MessageStorePlugin;
@@ -316,6 +324,8 @@ public:
virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
MessageMap& messageMap,
MessageQueueMap& messageQueueMap) = 0;
+ virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap) = 0;
//@}
};
diff --git a/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp b/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp
index 0ecfacfb4b..095d1bf331 100644
--- a/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp
+++ b/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp
@@ -26,51 +26,37 @@ namespace qpid {
namespace store {
namespace ms_sql {
-AmqpTransaction::AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db)
- : db(_db), transDepth(0)
+AmqpTransaction::AmqpTransaction(const boost::shared_ptr<DatabaseConnection>& _db)
+ : db(_db), sqlTrans(_db)
{
}
AmqpTransaction::~AmqpTransaction()
{
- if (transDepth > 0)
- this->abort();
}
void
-AmqpTransaction::begin()
+AmqpTransaction::sqlBegin()
{
- _bstr_t beginCmd("BEGIN TRANSACTION");
- _ConnectionPtr c = *db;
- c->Execute(beginCmd, NULL, adExecuteNoRecords);
- ++transDepth;
+ sqlTrans.begin();
}
void
-AmqpTransaction::commit()
+AmqpTransaction::sqlCommit()
{
- if (transDepth > 0) {
- _bstr_t commitCmd("COMMIT TRANSACTION");
- _ConnectionPtr c = *db;
- c->Execute(commitCmd, NULL, adExecuteNoRecords);
- --transDepth;
- }
+ sqlTrans.commit();
}
void
-AmqpTransaction::abort()
+AmqpTransaction::sqlAbort()
{
- if (transDepth > 0) {
- _bstr_t rollbackCmd("ROLLBACK TRANSACTION");
- _ConnectionPtr c = *db;
- c->Execute(rollbackCmd, NULL, adExecuteNoRecords);
- transDepth = 0;
- }
+ sqlTrans.abort();
}
-AmqpTPCTransaction::AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db,
+
+AmqpTPCTransaction::AmqpTPCTransaction(const boost::shared_ptr<DatabaseConnection>& db,
const std::string& _xid)
- : AmqpTransaction(_db), xid(_xid)
+ : AmqpTransaction(db), prepared(false), xid(_xid)
{
}
@@ -78,12 +64,4 @@ AmqpTPCTransaction::~AmqpTPCTransaction()
{
}
-void
-AmqpTPCTransaction::prepare()
-{
- // Intermediate transactions should have already assured integrity of
- // the content in the database; just waiting to pull the trigger on the
- // outermost transaction.
-}
-
}}} // namespace qpid::store::ms_sql
diff --git a/cpp/src/qpid/store/ms-sql/AmqpTransaction.h b/cpp/src/qpid/store/ms-sql/AmqpTransaction.h
index 9b87d0ae15..625fab5595 100644
--- a/cpp/src/qpid/store/ms-sql/AmqpTransaction.h
+++ b/cpp/src/qpid/store/ms-sql/AmqpTransaction.h
@@ -23,8 +23,10 @@
*/
#include <qpid/broker/TransactionalStore.h>
+#include <boost/shared_ptr.hpp>
#include <string>
-#include <memory>
+
+#include "SqlTransaction.h"
namespace qpid {
namespace store {
@@ -41,23 +43,18 @@ class DatabaseConnection;
*/
class AmqpTransaction : public qpid::broker::TransactionContext {
- std::auto_ptr<DatabaseConnection> db;
-
- // Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
- // et al, nested transactions are carried out with direct SQL commands.
- // To ensure the state of this is known, keep track of how deeply the
- // transactions are nested.
- unsigned int transDepth;
+ boost::shared_ptr<DatabaseConnection> db;
+ SqlTransaction sqlTrans;
public:
- AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db);
+ AmqpTransaction(const boost::shared_ptr<DatabaseConnection>& _db);
virtual ~AmqpTransaction();
DatabaseConnection *dbConn() { return db.get(); }
- void begin();
- void commit();
- void abort();
+ void sqlBegin();
+ void sqlCommit();
+ void sqlAbort();
};
/**
@@ -69,14 +66,18 @@ public:
*/
class AmqpTPCTransaction : public AmqpTransaction,
public qpid::broker::TPCTransactionContext {
+ bool prepared;
std::string xid;
public:
- AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db,
+ AmqpTPCTransaction(const boost::shared_ptr<DatabaseConnection>& db,
const std::string& _xid);
virtual ~AmqpTPCTransaction();
- void prepare();
+ void setPrepared(void) { prepared = true; }
+ bool isPrepared(void) const { return prepared; }
+
+ const std::string& getXid(void) const { return xid; }
};
}}} // namespace qpid::store::ms_sql
diff --git a/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp b/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp
index 34edec8acd..3219ea526a 100644
--- a/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp
+++ b/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp
@@ -67,4 +67,25 @@ DatabaseConnection::close()
conn = 0;
}
+std::string
+DatabaseConnection::getErrors()
+{
+ long errCount = conn->Errors->Count;
+ if (errCount <= 0)
+ return "";
+ // Collection ranges from 0 to nCount -1.
+ std::ostringstream messages;
+ ErrorPtr pErr = NULL;
+ for (long i = 0 ; i < errCount ; i++ ) {
+ if (i > 0)
+ messages << "\n";
+ messages << "[" << i << "] ";
+ pErr = conn->Errors->GetItem(i);
+ messages << "Error " << pErr->Number << ": "
+ << (LPCSTR)pErr->Description;
+ }
+ messages << std::ends;
+ return messages.str();
+}
+
}}} // namespace qpid::store::ms_sql
diff --git a/cpp/src/qpid/store/ms-sql/DatabaseConnection.h b/cpp/src/qpid/store/ms-sql/DatabaseConnection.h
index 2b8bbffa90..785d1587c5 100644
--- a/cpp/src/qpid/store/ms-sql/DatabaseConnection.h
+++ b/cpp/src/qpid/store/ms-sql/DatabaseConnection.h
@@ -55,6 +55,8 @@ public:
void beginTransaction() { conn->BeginTrans(); }
void commitTransaction() {conn->CommitTrans(); }
void rollbackTransaction() { conn->RollbackTrans(); }
+
+ std::string getErrors();
};
}}} // namespace qpid::store::ms_sql
diff --git a/cpp/src/qpid/store/ms-sql/Exception.h b/cpp/src/qpid/store/ms-sql/Exception.h
index 0da4b24210..65ec3388ff 100644
--- a/cpp/src/qpid/store/ms-sql/Exception.h
+++ b/cpp/src/qpid/store/ms-sql/Exception.h
@@ -43,7 +43,9 @@ public:
class ADOException : public Exception
{
public:
- ADOException(const std::string& _text, _com_error &e)
+ ADOException(const std::string& _text,
+ _com_error &e,
+ const std::string& providerErrors = "")
: Exception(_text) {
text += ": ";
text += e.ErrorMessage();
@@ -54,6 +56,8 @@ public:
text += (const char *)wmsg;
i->Release();
}
+ if (providerErrors.length() > 0)
+ text += providerErrors;
}
};
diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
index a26df59df7..323bc94c5c 100644
--- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
+++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
@@ -32,6 +32,7 @@
#include "BindingRecordset.h"
#include "MessageMapRecordset.h"
#include "MessageRecordset.h"
+#include "TplRecordset.h"
#include "DatabaseConnection.h"
#include "Exception.h"
#include "State.h"
@@ -51,6 +52,7 @@ const std::string TblExchange("tblExchange");
const std::string TblMessage("tblMessage");
const std::string TblMessageMap("tblMessageMap");
const std::string TblQueue("tblQueue");
+const std::string TblTpl("tblTPL");
}
namespace qpid {
@@ -256,9 +258,7 @@ public:
virtual void prepare(qpid::broker::TPCTransactionContext& txn);
virtual void commit(qpid::broker::TransactionContext& txn);
virtual void abort(qpid::broker::TransactionContext& txn);
-
- // @TODO This maybe should not be in TransactionalStore
- virtual void collectPreparedXids(std::set<std::string>& xids) {}
+ virtual void collectPreparedXids(std::set<std::string>& xids);
//@}
virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer);
@@ -272,6 +272,8 @@ public:
virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
MessageMap& messageMap,
MessageQueueMap& messageQueueMap);
+ virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap);
private:
struct ProviderOptions : public qpid::Options
@@ -310,7 +312,7 @@ private:
State *initState();
DatabaseConnection *initConnection(void);
- void createDb(_ConnectionPtr conn, const std::string &name);
+ void createDb(DatabaseConnection *db, const std::string &name);
};
static MSSqlProvider static_instance_registers_plugin;
@@ -356,7 +358,7 @@ MSSqlProvider::earlyInitialize(Plugin::Target &target)
// Database doesn't exist; create it
QPID_LOG(notice,
"MSSQL: Creating database " + options.catalogName);
- createDb(conn, options.catalogName);
+ createDb(db.get(), options.catalogName);
}
else {
QPID_LOG(notice,
@@ -407,8 +409,9 @@ MSSqlProvider::create(PersistableQueue& queue,
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error creating queue " + queue.getName(), e);
+ throw ADOException("Error creating queue " + queue.getName(), e, errs);
}
}
@@ -418,14 +421,6 @@ MSSqlProvider::create(PersistableQueue& queue,
void
MSSqlProvider::destroy(PersistableQueue& queue)
{
- // MessageDeleter class for use with for_each, below.
- class MessageDeleter {
- BlobRecordset& msgs;
- public:
- explicit MessageDeleter(BlobRecordset& _msgs) : msgs(_msgs) {}
- void operator()(uint64_t msgId) { msgs.remove(msgId); }
- };
-
DatabaseConnection *db = initConnection();
BlobRecordset rsQueues;
BindingRecordset rsBindings;
@@ -441,19 +436,18 @@ MSSqlProvider::destroy(PersistableQueue& queue)
// under the references in the bindings table. Then remove the
// message->queue entries for the queue, also because the queue can't
// be deleted while there are references to it. If there are messages
- // orphaned by removing the queue references, those messages can
- // also be deleted. Lastly, the queue record can be removed.
+ // orphaned by removing the queue references, they're deleted by
+ // a trigger on the tblMessageMap table. Lastly, the queue record
+ // can be removed.
rsBindings.removeForQueue(queue.getPersistenceId());
- std::vector<uint64_t> orphans;
- rsMessageMaps.removeForQueue(queue.getPersistenceId(), orphans);
- std::for_each(orphans.begin(), orphans.end(),
- MessageDeleter(rsMessages));
+ rsMessageMaps.removeForQueue(queue.getPersistenceId());
rsQueues.remove(queue);
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error deleting queue " + queue.getName(), e);
+ throw ADOException("Error deleting queue " + queue.getName(), e, errs);
}
}
@@ -473,8 +467,11 @@ MSSqlProvider::create(const PersistableExchange& exchange,
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error creating exchange " + exchange.getName(), e);
+ throw ADOException("Error creating exchange " + exchange.getName(),
+ e,
+ errs);
}
}
@@ -498,8 +495,11 @@ MSSqlProvider::destroy(const PersistableExchange& exchange)
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error deleting exchange " + exchange.getName(), e);
+ throw ADOException("Error deleting exchange " + exchange.getName(),
+ e,
+ errs);
}
}
@@ -524,9 +524,12 @@ MSSqlProvider::bind(const PersistableExchange& exchange,
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error binding exchange " + exchange.getName() +
- " to queue " + queue.getName(), e);
+ " to queue " + queue.getName(),
+ e,
+ errs);
}
}
@@ -551,9 +554,12 @@ MSSqlProvider::unbind(const PersistableExchange& exchange,
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error unbinding exchange " + exchange.getName() +
- " from queue " + queue.getName(), e);
+ " from queue " + queue.getName(),
+ e,
+ errs);
}
}
@@ -572,8 +578,9 @@ MSSqlProvider::create(const PersistableConfig& config)
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error creating config " + config.getName(), e);
+ throw ADOException("Error creating config " + config.getName(), e, errs);
}
}
@@ -592,8 +599,9 @@ MSSqlProvider::destroy(const PersistableConfig& config)
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error deleting config " + config.getName(), e);
+ throw ADOException("Error deleting config " + config.getName(), e, errs);
}
}
@@ -618,8 +626,9 @@ MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error staging message", e);
+ throw ADOException("Error staging message", e, errs);
}
}
@@ -641,8 +650,9 @@ MSSqlProvider::destroy(PersistableMessage& msg)
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error deleting message", e);
+ throw ADOException("Error deleting message", e, errs);
}
}
@@ -662,8 +672,9 @@ MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
db->rollbackTransaction();
- throw ADOException("Error appending to message", e);
+ throw ADOException("Error appending to message", e, errs);
}
}
@@ -691,7 +702,8 @@ MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
rsMessages.loadContent(msg, data, offset, length);
}
catch(_com_error &e) {
- throw ADOException("Error loading message content", e);
+ std::string errs = db->getErrors();
+ throw ADOException("Error loading message content", e, errs);
}
}
@@ -714,6 +726,7 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
// this is not in the context of a transaction, then just use the thread's
// DatabaseConnection with a ADO transaction.
DatabaseConnection *db = 0;
+ std::string xid;
AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
if (atxn == 0) {
db = initConnection();
@@ -721,12 +734,16 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
}
else {
(void)initState(); // Ensure this thread is initialized
+ // It's a transactional enqueue; if it's TPC, grab the xid.
+ AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt);
+ if (tpcTxn)
+ xid = tpcTxn->getXid();
db = atxn->dbConn();
try {
- atxn->begin();
+ atxn->sqlBegin();
}
catch(_com_error &e) {
- throw ADOException("Error queuing message", e);
+ throw ADOException("Error queuing message", e, db->getErrors());
}
}
@@ -738,18 +755,19 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
rsMessages.add(msg);
}
rsMap.open(db, TblMessageMap);
- rsMap.add(msg->getPersistenceId(), queue.getPersistenceId());
+ rsMap.add(msg->getPersistenceId(), queue.getPersistenceId(), xid);
if (atxn)
- atxn->commit();
+ atxn->sqlCommit();
else
db->commitTransaction();
}
catch(_com_error &e) {
+ std::string errs = db->getErrors();
if (atxn)
- atxn->abort();
+ atxn->sqlAbort();
else
db->rollbackTransaction();
- throw ADOException("Error queuing message", e);
+ throw ADOException("Error queuing message", e, errs);
}
msg->enqueueComplete();
}
@@ -773,6 +791,7 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt,
// this is not in the context of a transaction, then just use the thread's
// DatabaseConnection with a ADO transaction.
DatabaseConnection *db = 0;
+ std::string xid;
AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
if (atxn == 0) {
db = initConnection();
@@ -780,36 +799,54 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt,
}
else {
(void)initState(); // Ensure this thread is initialized
+ // It's a transactional dequeue; if it's TPC, grab the xid.
+ AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt);
+ if (tpcTxn)
+ xid = tpcTxn->getXid();
db = atxn->dbConn();
try {
- atxn->begin();
+ atxn->sqlBegin();
}
catch(_com_error &e) {
- throw ADOException("Error queuing message", e);
+ throw ADOException("Error queuing message", e, db->getErrors());
}
}
MessageMapRecordset rsMap;
- MessageRecordset rsMessages;
try {
rsMap.open(db, TblMessageMap);
- bool more = rsMap.remove(msg->getPersistenceId(),
- queue.getPersistenceId());
- if (!more) {
- rsMessages.open(db, TblMessage);
- rsMessages.remove(msg);
+ // TPC dequeues are just marked pending and will actually be removed
+ // when the transaction commits; Single-phase dequeues are removed
+ // now, relying on the SQL transaction to put it back if the
+ // transaction doesn't commit.
+ if (!xid.empty()) {
+ rsMap.pendingRemove(msg->getPersistenceId(),
+ queue.getPersistenceId(),
+ xid);
+ }
+ else {
+ rsMap.remove(msg->getPersistenceId(),
+ queue.getPersistenceId());
}
if (atxn)
- atxn->commit();
+ atxn->sqlCommit();
else
db->commitTransaction();
}
+ catch(ms_sql::Exception&) {
+ if (atxn)
+ atxn->sqlAbort();
+ else
+ db->rollbackTransaction();
+ throw;
+ }
catch(_com_error &e) {
+ std::string errs = db->getErrors();
if (atxn)
- atxn->abort();
+ atxn->sqlAbort();
else
db->rollbackTransaction();
- throw ADOException("Error dequeuing message", e);
+ throw ADOException("Error dequeuing message", e, errs);
}
msg->dequeueComplete();
}
@@ -827,10 +864,10 @@ MSSqlProvider::begin()
// it safe and handle this just like a TPC transaction, which actually
// can be prepared and committed/aborted from different threads,
// making it a bad idea to try using the thread-local DatabaseConnection.
- std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+ boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection);
db->open(options.connectString, options.catalogName);
std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db));
- tx->begin();
+ tx->sqlBegin();
std::auto_ptr<qpid::broker::TransactionContext> tc(tx);
return tc;
}
@@ -839,10 +876,24 @@ std::auto_ptr<qpid::broker::TPCTransactionContext>
MSSqlProvider::begin(const std::string& xid)
{
(void)initState(); // Ensure this thread is initialized
- std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+ boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection);
db->open(options.connectString, options.catalogName);
std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid));
- tx->begin();
+ tx->sqlBegin();
+
+ TplRecordset rsTpl;
+ try {
+ tx->sqlBegin();
+ rsTpl.open(db.get(), TblTpl);
+ rsTpl.add(xid);
+ tx->sqlCommit();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ tx->sqlAbort();
+ throw ADOException("Error adding TPL record", e, errs);
+ }
+
std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
return tc;
}
@@ -850,28 +901,122 @@ MSSqlProvider::begin(const std::string& xid)
void
MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn)
{
- // The inner transactions used for the components of the TPC are done;
- // nothing else to do but wait for the commit.
+ // Commit all the marked-up enqueue/dequeue ops and the TPL record.
+ // On commit/rollback the TPL will be removed and the TPL markups
+ // on the message map will be cleaned up as well.
+ (void)initState(); // Ensure this thread is initialized
+ AmqpTPCTransaction *atxn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+ if (atxn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ try {
+ atxn->sqlCommit();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error preparing", e, atxn->dbConn()->getErrors());
+ }
+ atxn->setPrepared();
}
void
MSSqlProvider::commit(qpid::broker::TransactionContext& txn)
{
(void)initState(); // Ensure this thread is initialized
- AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn);
- if (atxn == 0)
- throw qpid::broker::InvalidTransactionContextException();
- atxn->commit();
+ /*
+ * One-phase transactions simply commit the outer SQL transaction
+ * that was begun on begin(). Two-phase transactions are different -
+ * the SQL transaction started on begin() was committed on prepare()
+ * so all the SQL records reflecting the enqueue/dequeue actions for
+ * the transaction are recorded but with xid markups on them to reflect
+ * that they are prepared but not committed. Now go back and remove
+ * the markups, deleting those marked for removal.
+ */
+ AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+ if (p2txn == 0) {
+ AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn);
+ if (p1txn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ p1txn->sqlCommit();
+ return;
+ }
+
+ DatabaseConnection *db(p2txn->dbConn());
+ TplRecordset rsTpl;
+ MessageMapRecordset rsMessageMap;
+ try {
+ db->beginTransaction();
+ rsTpl.open(db, TblTpl);
+ rsMessageMap.open(db, TblMessageMap);
+ rsMessageMap.commitPrepared(p2txn->getXid());
+ rsTpl.remove(p2txn->getXid());
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error committing transaction", e, errs);
+ }
}
void
MSSqlProvider::abort(qpid::broker::TransactionContext& txn)
{
(void)initState(); // Ensure this thread is initialized
+ /*
+ * One-phase and non-prepared two-phase transactions simply abort
+ * the outer SQL transaction that was begun on begin(). However, prepared
+ * two-phase transactions are different - the SQL transaction started
+ * on begin() was committed on prepare() so all the SQL records
+ * reflecting the enqueue/dequeue actions for the transaction are
+ * recorded but with xid markups on them to reflect that they are
+ * prepared but not committed. Now go back and remove the markups,
+ * deleting those marked for addition.
+ */
+ AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+ if (p2txn == 0 || !p2txn->isPrepared()) {
+ AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn);
+ if (p1txn == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ p1txn->sqlAbort();
+ return;
+ }
+
+ DatabaseConnection *db(p2txn->dbConn());
+ TplRecordset rsTpl;
+ MessageMapRecordset rsMessageMap;
+ try {
+ db->beginTransaction();
+ rsTpl.open(db, TblTpl);
+ rsMessageMap.open(db, TblMessageMap);
+ rsMessageMap.abortPrepared(p2txn->getXid());
+ rsTpl.remove(p2txn->getXid());
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error committing transaction", e, errs);
+ }
+
+
+ (void)initState(); // Ensure this thread is initialized
AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn);
if (atxn == 0)
throw qpid::broker::InvalidTransactionContextException();
- atxn->abort();
+ atxn->sqlAbort();
+}
+
+void
+MSSqlProvider::collectPreparedXids(std::set<std::string>& xids)
+{
+ DatabaseConnection *db = initConnection();
+ try {
+ TplRecordset rsTpl;
+ rsTpl.open(db, TblTpl);
+ rsTpl.recover(xids);
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error reading TPL", e, db->getErrors());
+ }
}
// @TODO Much of this recovery code is way too similar... refactor to
@@ -977,6 +1122,40 @@ MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer,
rsMessageMaps.recover(messageQueueMap);
}
+void
+MSSqlProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap)
+{
+ DatabaseConnection *db = initConnection();
+ std::set<std::string> xids;
+ try {
+ TplRecordset rsTpl;
+ rsTpl.open(db, TblTpl);
+ rsTpl.recover(xids);
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recovering TPL records", e, db->getErrors());
+ }
+
+ try {
+ // Rebuild the needed RecoverableTransactions.
+ for (std::set<std::string>::const_iterator iXid = xids.begin();
+ iXid != xids.end();
+ ++iXid) {
+ boost::shared_ptr<DatabaseConnection> dbX(new DatabaseConnection);
+ dbX->open(options.connectString, options.catalogName);
+ std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(dbX,
+ *iXid));
+ tx->setPrepared();
+ std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
+ dtxMap[*iXid] = recoverer.recoverTransaction(*iXid, tc);
+ }
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error recreating dtx connection", e);
+ }
+}
+
////////////// Internal Methods
State *
@@ -1003,7 +1182,7 @@ MSSqlProvider::initConnection(void)
}
void
-MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name)
+MSSqlProvider::createDb(DatabaseConnection *db, const std::string &name)
{
const std::string dbCmd = "CREATE DATABASE " + name;
const std::string useCmd = "USE " + name;
@@ -1018,9 +1197,15 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name)
" fieldTableBlob varbinary(MAX))";
const std::string messageMapSpecs =
" (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL,"
- " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL)";
+ " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
+ " prepareStatus tinyint CHECK (prepareStatus IS NULL OR "
+ " prepareStatus IN (1, 2)),"
+ " xid varbinary(512) REFERENCES tblTPL(xid)"
+ " CONSTRAINT CK_NoDups UNIQUE NONCLUSTERED (messageId, queueId) )";
+ const std::string tplSpecs = " (xid varbinary(512) PRIMARY KEY NOT NULL)";
_variant_t unused;
_bstr_t dbStr = dbCmd.c_str();
+ _ConnectionPtr conn(*db);
try {
conn->Execute(dbStr, &unused, adExecuteNoRecords);
_bstr_t useStr = useCmd.c_str();
@@ -1040,12 +1225,15 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name)
makeTable = tableCmd + TblBinding + bindingSpecs;
makeTableStr = makeTable.c_str();
conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblTpl + tplSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
makeTable = tableCmd + TblMessageMap + messageMapSpecs;
makeTableStr = makeTable.c_str();
conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
}
catch(_com_error &e) {
- throw ADOException("MSSQL can't create " + name, e);
+ throw ADOException("MSSQL can't create " + name, e, db->getErrors());
}
}
diff --git a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
index 68e174a2b0..a2ff8aefdd 100644
--- a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
+++ b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
@@ -24,96 +24,197 @@
#include <qpid/store/StorageProvider.h>
#include "MessageMapRecordset.h"
+#include "BlobEncoder.h"
+#include "DatabaseConnection.h"
+#include "Exception.h"
#include "VariantHelper.h"
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
namespace qpid {
namespace store {
namespace ms_sql {
void
-MessageMapRecordset::add(uint64_t messageId, uint64_t queueId)
+MessageMapRecordset::open(DatabaseConnection* conn, const std::string& table)
{
- rs->AddNew();
- rs->Fields->GetItem("messageId")->Value = messageId;
- rs->Fields->GetItem("queueId")->Value = queueId;
- rs->Update();
+ init(conn, table);
}
-bool
+void
+MessageMapRecordset::add(uint64_t messageId,
+ uint64_t queueId,
+ const std::string& xid)
+{
+ std::ostringstream command;
+ command << "INSERT INTO " << tableName
+ << " (messageId, queueId";
+ if (!xid.empty())
+ command << ", prepareStatus, xid";
+ command << ") VALUES (" << messageId << "," << queueId;
+ if (!xid.empty())
+ command << "," << PREPARE_ADD << ",?";
+ command << ")" << std::ends;
+
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.str().c_str();
+ cmd->CommandType = adCmdText;
+ if (!xid.empty()) {
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ }
+ cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId)
{
- // Look up all mappings for the specified message. Then scan
- // for the specified queue and keep track of whether or not the
- // message exists on any queue we are not looking for a well.
- std::ostringstream filter;
- filter << "messageId = " << messageId << std::ends;
- rs->PutFilter (VariantHelper<std::string>(filter.str()));
- if (rs->RecordCount == 0)
- return false;
+ std::ostringstream command;
+ command << "DELETE FROM " << tableName
+ << " WHERE queueId = " << queueId
+ << " AND messageId = " << messageId << std::ends;
+ _CommandPtr cmd = NULL;
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.str().c_str();
+ cmd->CommandType = adCmdText;
+ _variant_t deletedRecords;
+ cmd->Execute(&deletedRecords, NULL, adCmdText | adExecuteNoRecords);
+ if ((long)deletedRecords == 0)
+ throw ms_sql::Exception("Message does not exist in queue mapping");
+ // Trigger on deleting the mapping takes care of deleting orphaned
+ // message record from tblMessage.
+}
+
+void
+MessageMapRecordset::pendingRemove(uint64_t messageId,
+ uint64_t queueId,
+ const std::string& xid)
+{
+ // Look up the mapping for the specified message and queue. There
+ // should be only one because of the uniqueness constraint in the
+ // SQL table. Update it to reflect it's pending delete with
+ // the specified xid.
+ std::ostringstream command;
+ command << "UPDATE " << tableName
+ << " SET prepareStatus=" << PREPARE_REMOVE
+ << " , xid=?"
+ << " WHERE queueId = " << queueId
+ << " AND messageId = " << messageId << std::ends;
+
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.str().c_str();
+ cmd->CommandType = adCmdText;
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+MessageMapRecordset::removeForQueue(uint64_t queueId)
+{
+ std::ostringstream command;
+ command << "DELETE FROM " << tableName
+ << " WHERE queueId = " << queueId << std::ends;
+ _CommandPtr cmd = NULL;
+
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.str().c_str();
+ cmd->CommandType = adCmdText;
+ cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+MessageMapRecordset::commitPrepared(const std::string& xid)
+{
+ // Find all the records for the specified xid. Records marked as adding
+ // are now permanent so remove the xid and prepareStatus. Records marked
+ // as removing are removed entirely.
+ openRs();
MessageMap m;
IADORecordBinding *piAdoRecordBinding;
rs->QueryInterface(__uuidof(IADORecordBinding),
(LPVOID *)&piAdoRecordBinding);
piAdoRecordBinding->BindToRecordset(&m);
- bool moreEntries = false, deleted = false;
- rs->MoveFirst();
- // If the desired mapping gets deleted, and we already know there are
- // other mappings for the message, don't bother finishing the scan.
- while (!rs->EndOfFile && !(deleted && moreEntries)) {
- if (m.queueId == queueId) {
+ for (; !rs->EndOfFile; rs->MoveNext()) {
+ if (m.xidStatus != adFldOK)
+ continue;
+ const std::string x(m.xid, m.xidLength);
+ if (x != xid)
+ continue;
+ if (m.prepareStatus == PREPARE_REMOVE) {
rs->Delete(adAffectCurrent);
- rs->Update();
- deleted = true;
}
else {
- moreEntries = true;
+ _variant_t dbNull;
+ dbNull.ChangeType(VT_NULL);
+ rs->Fields->GetItem("prepareStatus")->Value = dbNull;
+ rs->Fields->GetItem("xid")->Value = dbNull;
}
- rs->MoveNext();
+ rs->Update();
}
piAdoRecordBinding->Release();
- rs->Filter = "";
- return moreEntries;
}
void
-MessageMapRecordset::removeForQueue(uint64_t queueId,
- std::vector<uint64_t>& orphaned)
+MessageMapRecordset::abortPrepared(const std::string& xid)
{
- // Read all the messages queued on queueId and add them to the orphaned
- // list. Then remove each one and learn if there are references to it
- // from other queues. The ones without references are left in the
- // orphaned list, others are removed.
- std::ostringstream filter;
- filter << "queueId = " << queueId << std::ends;
- rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ // Find all the records for the specified xid. Records marked as adding
+ // need to be removed while records marked as removing are put back to
+ // no xid and no prepareStatus.
+ openRs();
MessageMap m;
IADORecordBinding *piAdoRecordBinding;
rs->QueryInterface(__uuidof(IADORecordBinding),
(LPVOID *)&piAdoRecordBinding);
piAdoRecordBinding->BindToRecordset(&m);
- while (!rs->EndOfFile) {
- orphaned.push_back(m.messageId);
- rs->MoveNext();
+ for (; !rs->EndOfFile; rs->MoveNext()) {
+ if (m.xidStatus != adFldOK)
+ continue;
+ const std::string x(m.xid, m.xidLength);
+ if (x != xid)
+ continue;
+ if (m.prepareStatus == PREPARE_ADD) {
+ rs->Delete(adAffectCurrent);
+ }
+ else {
+ _variant_t dbNull;
+ dbNull.ChangeType(VT_NULL);
+ rs->Fields->GetItem("prepareStatus")->Value = dbNull;
+ rs->Fields->GetItem("xid")->Value = dbNull;
+ }
+ rs->Update();
}
piAdoRecordBinding->Release();
- rs->Filter = ""; // Remove filter on queueId
- rs->Requery(adOptionUnspecified); // Get the entire map again
-
- // Now delete all the messages on this queue; any message that still has
- // references from other queue(s) is removed from orphaned.
- for (std::vector<uint64_t>::iterator i = orphaned.begin();
- i != orphaned.end();
- ) {
- if (remove(*i, queueId))
- i = orphaned.erase(i); // There are other refs to message *i
- else
- ++i;
- }
}
void
MessageMapRecordset::recover(MessageQueueMap& msgMap)
{
+ openRs();
if (rs->BOF && rs->EndOfFile)
return; // Nothing to do
rs->MoveFirst();
@@ -123,7 +224,18 @@ MessageMapRecordset::recover(MessageQueueMap& msgMap)
(LPVOID *)&piAdoRecordBinding);
piAdoRecordBinding->BindToRecordset(&b);
while (!rs->EndOfFile) {
- msgMap[b.messageId].push_back(b.queueId);
+ qpid::store::QueueEntry entry;
+ entry.queueId = b.queueId;
+ if (b.xidStatus == adFldOK && b.xidLength > 0) {
+ entry.xid.assign(b.xid, b.xidLength);
+ entry.tplStatus =
+ b.prepareStatus == PREPARE_ADD ? QueueEntry::ADDING
+ : QueueEntry::REMOVING;
+ }
+ else {
+ entry.tplStatus = QueueEntry::NONE;
+ }
+ msgMap[b.messageId].push_back(entry);
rs->MoveNext();
}
@@ -133,6 +245,7 @@ MessageMapRecordset::recover(MessageQueueMap& msgMap)
void
MessageMapRecordset::dump()
{
+ openRs();
Recordset::dump();
if (rs->EndOfFile && rs->BOF) // No records
return;
diff --git a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
index 475137b18b..1b0c2f073e 100644
--- a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
+++ b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
@@ -23,7 +23,6 @@
*/
#include <icrsint.h>
-#include <vector>
#include "Recordset.h"
#include <qpid/broker/RecoveryManager.h>
@@ -38,32 +37,56 @@ namespace ms_sql {
*/
class MessageMapRecordset : public Recordset {
+ // These values are defined in a constraint on the tblMessageMap table.
+ // the prepareStatus column can only be null, 1, or 2.
+ enum { PREPARE_ADD=1, PREPARE_REMOVE=2 };
+
class MessageMap : public CADORecordBinding {
BEGIN_ADO_BINDING(MessageMap)
ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE)
ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE)
+ ADO_FIXED_LENGTH_ENTRY2(3, adTinyInt, prepareStatus, FALSE)
+ ADO_VARIABLE_LENGTH_ENTRY(4, adVarBinary, xid, sizeof(xid),
+ xidStatus, xidLength, FALSE)
END_ADO_BINDING()
public:
uint64_t messageId;
uint64_t queueId;
+ uint8_t prepareStatus;
+ char xid[512];
+ int xidStatus;
+ uint32_t xidLength;
};
+ void selectOnXid(const std::string& xid);
+
public:
+ virtual void open(DatabaseConnection* conn, const std::string& table);
+
// Add a new mapping
- void add(uint64_t messageId, uint64_t queueId);
-
- // Remove a specific mapping. Returns true if the message is still
- // enqueued on at least one other queue; false if the message no longer
- // exists on any other queues.
- bool remove(uint64_t messageId, uint64_t queueId);
-
- // Remove mappings for all messages on a specified queue. If there are
- // messages that were only on the specified queue and are, therefore,
- // orphaned now, return them in the orphaned vector. The orphaned
- // messages can be deleted permanently as they are not referenced on
- // any other queues.
- void removeForQueue(uint64_t queueId, std::vector<uint64_t>& orphaned);
+ void add(uint64_t messageId,
+ uint64_t queueId,
+ const std::string& xid = "");
+
+ // Remove a specific mapping.
+ void remove(uint64_t messageId, uint64_t queueId);
+
+ // Mark the indicated message->queue entry pending removal. The entry
+ // for the mapping is updated to indicate pending removal with the
+ // specified xid.
+ void pendingRemove(uint64_t messageId,
+ uint64_t queueId,
+ const std::string& xid);
+
+ // Remove mappings for all messages on a specified queue.
+ void removeForQueue(uint64_t queueId);
+
+ // Commit records recorded as prepared.
+ void commitPrepared(const std::string& xid);
+
+ // Abort prepared changes.
+ void abortPrepared(const std::string& xid);
// Recover the mappings of message ID -> vector<queue ID>.
void recover(MessageQueueMap& msgMap);
diff --git a/cpp/src/qpid/store/ms-sql/Recordset.cpp b/cpp/src/qpid/store/ms-sql/Recordset.cpp
index e1a5158c87..e706799951 100644
--- a/cpp/src/qpid/store/ms-sql/Recordset.cpp
+++ b/cpp/src/qpid/store/ms-sql/Recordset.cpp
@@ -35,63 +35,35 @@ namespace qpid {
namespace store {
namespace ms_sql {
-#if 0
-Recordset::Iterator::Iterator(Recordset& _rs) : rs(_rs)
-{
- rs->MoveFirst();
- setCurrent();
-}
-
-std::pair<uint64_t, BlobAdapter>&
-Recordset::Iterator::dereference() const
-{
- return const_cast<std::pair<uint64_t, BlobAdapter> >(current);
-}
-
-void
-Recordset::Iterator::increment()
-{
- rs->MoveNext();
- setCurrent();
-}
-
-bool
-Recordset::Iterator::equal(const Iterator& x) const
-{
- return current.first == x.current.first;
-}
void
-Recordset::Iterator::setCurrent()
+Recordset::init(DatabaseConnection* conn, const std::string& table)
{
- if (!rs->EndOfFile) {
- uint64_t id = rs->Fields->Item["persistenceId"]->Value;
- long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
- BlobAdapter blob(blobSize);
- blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
- current = std::make_pair(id, blob);
- }
- else {
- current.first = 0;
- }
+ dbConn = conn;
+ TESTHR(rs.CreateInstance(__uuidof(::Recordset)));
+ tableName = table;
}
-#endif
void
-Recordset::open(DatabaseConnection* conn, const std::string& table)
+Recordset::openRs()
{
- _ConnectionPtr p = *conn;
- TESTHR(rs.CreateInstance(__uuidof(::Recordset)));
// Client-side cursors needed to get access to newly added
// identity column immediately. Recordsets need this to get the
// persistence ID for the broker objects.
rs->CursorLocation = adUseClient;
- rs->Open(table.c_str(),
+ _ConnectionPtr p = *dbConn;
+ rs->Open(tableName.c_str(),
_variant_t((IDispatch *)p, true),
adOpenStatic,
adLockOptimistic,
adCmdTable);
- tableName = table;
+}
+
+void
+Recordset::open(DatabaseConnection* conn, const std::string& table)
+{
+ init(conn, table);
+ openRs();
}
void
@@ -99,7 +71,6 @@ Recordset::close()
{
if (rs && rs->State == adStateOpen)
rs->Close();
- rs = 0;
}
void
diff --git a/cpp/src/qpid/store/ms-sql/Recordset.h b/cpp/src/qpid/store/ms-sql/Recordset.h
index 3631838aa8..032b2bd434 100644
--- a/cpp/src/qpid/store/ms-sql/Recordset.h
+++ b/cpp/src/qpid/store/ms-sql/Recordset.h
@@ -51,46 +51,20 @@ protected:
DatabaseConnection* dbConn;
std::string tableName;
+ void init(DatabaseConnection* conn, const std::string& table);
+ void openRs();
+
public:
+ Recordset() : rs(0), dbConn(0) {}
+ virtual ~Recordset() { close(); rs = 0; dbConn = 0; }
-#if 0
/**
- * Iterator support for walking through the recordset.
- * If I need to try this again, I'd look at Recordset cloning.
+ * Default open() reads all records into the recordset.
*/
- class Iterator : public boost::iterator_facade<
- Iterator, std::pair<uint64_t, BlobAdapter>, boost::random_access_traversal_tag>
- {
- public:
- Iterator() : rs(0) { }
- Iterator(Recordset& _rs);
-
- private:
- friend class boost::iterator_core_access;
-
- std::pair<uint64_t, BlobAdapter>& dereference() const;
- void increment();
- bool equal(const Iterator& x) const;
-
- _RecordsetPtr rs;
- std::pair<uint64_t, BlobAdapter> current;
-
- void setCurrent();
- };
-
- friend class Iterator;
-#endif
-
- Recordset() : rs(0) {}
- virtual ~Recordset() { close(); }
- void open(DatabaseConnection* conn, const std::string& table);
+ virtual void open(DatabaseConnection* conn, const std::string& table);
void close();
void requery();
operator _RecordsetPtr () { return rs; }
-#if 0
- Iterator begin() { Iterator iter(*this); return iter; }
- Iterator end() { Iterator iter; return iter; }
-#endif
// Dump table contents; useful for debugging.
void dump();
diff --git a/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp b/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp
new file mode 100644
index 0000000000..6ad7725570
--- /dev/null
+++ b/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SqlTransaction.h"
+#include "DatabaseConnection.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+SqlTransaction::SqlTransaction(const boost::shared_ptr<DatabaseConnection>& _db)
+ : db(_db), transDepth(0)
+{
+}
+
+SqlTransaction::~SqlTransaction()
+{
+ if (transDepth > 0)
+ this->abort();
+}
+
+void
+SqlTransaction::begin()
+{
+ _bstr_t beginCmd("BEGIN TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(beginCmd, NULL, adExecuteNoRecords);
+ ++transDepth;
+}
+
+void
+SqlTransaction::commit()
+{
+ if (transDepth > 0) {
+ _bstr_t commitCmd("COMMIT TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(commitCmd, NULL, adExecuteNoRecords);
+ --transDepth;
+ }
+}
+
+void
+SqlTransaction::abort()
+{
+ if (transDepth > 0) {
+ _bstr_t rollbackCmd("ROLLBACK TRANSACTION");
+ _ConnectionPtr c = *db;
+ c->Execute(rollbackCmd, NULL, adExecuteNoRecords);
+ transDepth = 0;
+ }
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/cpp/src/qpid/store/ms-sql/SqlTransaction.h b/cpp/src/qpid/store/ms-sql/SqlTransaction.h
new file mode 100644
index 0000000000..8b5239b786
--- /dev/null
+++ b/cpp/src/qpid/store/ms-sql/SqlTransaction.h
@@ -0,0 +1,67 @@
+#ifndef QPID_STORE_MSSQL_SQLTRANSACTION_H
+#define QPID_STORE_MSSQL_SQLTRANSACTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <string>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @class SqlTransaction
+ *
+ * Class representing an SQL transaction.
+ * Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+ * et al, nested transactions are carried out with direct SQL commands.
+ * To ensure the state of this is known, keep track of how deeply the
+ * transactions are nested. This is more of a safety/sanity check since
+ * AMQP doesn't provide nested transactions.
+ */
+class SqlTransaction {
+
+ boost::shared_ptr<DatabaseConnection> db;
+
+ // Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+ // et al, nested transactions are carried out with direct SQL commands.
+ // To ensure the state of this is known, keep track of how deeply the
+ // transactions are nested.
+ unsigned int transDepth;
+
+public:
+ SqlTransaction(const boost::shared_ptr<DatabaseConnection>& _db);
+ ~SqlTransaction();
+
+ DatabaseConnection *dbConn() { return db.get(); }
+
+ void begin();
+ void commit();
+ void abort();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_SQLTRANSACTION_H */
diff --git a/cpp/src/qpid/store/ms-sql/TplRecordset.cpp b/cpp/src/qpid/store/ms-sql/TplRecordset.cpp
new file mode 100644
index 0000000000..1309d921a9
--- /dev/null
+++ b/cpp/src/qpid/store/ms-sql/TplRecordset.cpp
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "TplRecordset.h"
+#include "BlobEncoder.h"
+#include "DatabaseConnection.h"
+#include "VariantHelper.h"
+
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+TplRecordset::open(DatabaseConnection* conn, const std::string& table)
+{
+ init(conn, table);
+ // Don't actually open until we know what to do. It's far easier and more
+ // efficient to simply do most of these TPL/xid ops in a single statement.
+}
+
+void
+TplRecordset::add(const std::string& xid)
+{
+ const std::string command =
+ "INSERT INTO " + tableName + " ( xid ) VALUES ( ? )";
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.c_str();
+ cmd->CommandType = adCmdText;
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+TplRecordset::remove(const std::string& xid)
+{
+ // Look up the item by its xid
+ const std::string command =
+ "DELETE FROM " + tableName + " WHERE xid = ?";
+ _CommandPtr cmd = NULL;
+ _ParameterPtr xidVal = NULL;
+
+ TESTHR(cmd.CreateInstance(__uuidof(Command)));
+ TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+ _ConnectionPtr p = *dbConn;
+ cmd->ActiveConnection = p;
+ cmd->CommandText = command.c_str();
+ cmd->CommandType = adCmdText;
+ xidVal->Name = "@xid";
+ xidVal->Type = adVarBinary;
+ xidVal->Size = xid.length();
+ xidVal->Direction = adParamInput;
+ xidVal->Value = BlobEncoder(xid);
+ cmd->Parameters->Append(xidVal);
+ _variant_t deletedRecords;
+ cmd->Execute(&deletedRecords, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+TplRecordset::recover(std::set<std::string>& xids)
+{
+ openRs();
+ if (rs->BOF && rs->EndOfFile)
+ return; // Nothing to do
+ rs->MoveFirst();
+ while (!rs->EndOfFile) {
+ _variant_t wxid = rs->Fields->Item["xid"]->Value;
+ char *xidBytes;
+ SafeArrayAccessData(wxid.parray, (void **)&xidBytes);
+ std::string xid(xidBytes, rs->Fields->Item["xid"]->ActualSize);
+ xids.insert(xid);
+ SafeArrayUnaccessData(wxid.parray);
+ rs->MoveNext();
+ }
+}
+
+void
+TplRecordset::dump()
+{
+ Recordset::dump();
+ if (rs->EndOfFile && rs->BOF) // No records
+ return;
+
+ rs->MoveFirst();
+ while (!rs->EndOfFile) {
+ _bstr_t wxid = rs->Fields->Item["xid"]->Value;
+ QPID_LOG(notice, " -> " << (const char *)wxid);
+ rs->MoveNext();
+ }
+}
+
+}}} // namespace qpid::store::ms_sql
diff --git a/cpp/src/qpid/store/ms-sql/TplRecordset.h b/cpp/src/qpid/store/ms-sql/TplRecordset.h
new file mode 100644
index 0000000000..fbde51738c
--- /dev/null
+++ b/cpp/src/qpid/store/ms-sql/TplRecordset.h
@@ -0,0 +1,58 @@
+#ifndef QPID_STORE_MSSQL_TPLRECORDSET_H
+#define QPID_STORE_MSSQL_TPLRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Recordset.h"
+#include <string>
+#include <set>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class TplRecordset
+ *
+ * Class for the TPL (Transaction Prepared List) records.
+ */
+class TplRecordset : public Recordset {
+protected:
+
+public:
+ virtual void open(DatabaseConnection* conn, const std::string& table);
+
+ void add(const std::string& xid);
+
+ // Remove a record given its xid.
+ void remove(const std::string& xid);
+
+ // Recover prepared transaction XIDs.
+ void recover(std::set<std::string>& xids);
+
+ // Dump table contents; useful for debugging.
+ void dump();
+};
+
+}}} // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_TPLRECORDSET_H */
diff --git a/cpp/src/qpid/store/ms-sql/VariantHelper.cpp b/cpp/src/qpid/store/ms-sql/VariantHelper.cpp
index 786724f031..acec95c1f9 100644
--- a/cpp/src/qpid/store/ms-sql/VariantHelper.cpp
+++ b/cpp/src/qpid/store/ms-sql/VariantHelper.cpp
@@ -41,13 +41,25 @@ VariantHelper<Wrapped>::operator const _variant_t& () const
// Specialization for using _variant_t to wrap a std::string
VariantHelper<std::string>::VariantHelper(const std::string &init)
{
- var.SetString(init.c_str());
+ if (init.empty() || init.length() == 0) {
+ var.vt = VT_BSTR;
+ var.bstrVal = NULL;
+ }
+ else {
+ var.SetString(init.c_str());
+ }
}
VariantHelper<std::string>&
VariantHelper<std::string>::operator=(const std::string &rhs)
{
- var.SetString(rhs.c_str());
+ if (rhs.empty() || rhs.length() == 0) {
+ var.vt = VT_BSTR;
+ var.bstrVal = NULL;
+ }
+ else {
+ var.SetString(rhs.c_str());
+ }
return *this;
}