diff options
author | Stephen D. Huston <shuston@apache.org> | 2009-11-03 00:53:24 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2009-11-03 00:53:24 +0000 |
commit | 8e3202df7c4701d1effd19d0e0a9e332a8c3bc1c (patch) | |
tree | 0b6ce145425a54c09ce9b7b6c2be2691106aa3ab /cpp | |
parent | ae3b13c693bab1a66d4a5d72873869fd128e32a3 (diff) | |
download | qpid-python-8e3202df7c4701d1effd19d0e0a9e332a8c3bc1c.tar.gz |
When storing a message, set its new persistence Id. Don't throw an exception when attempting to enqueue/dequeue a message without a AMQP transaction associated. Mark queued/dequeued messages complete right after storing in database; there's no delayed notify needed for this storage provider.
Fixes QPID-2170.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@832234 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/store/CMakeLists.txt | 2 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp | 87 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MessageRecordset.cpp | 2 |
3 files changed, 65 insertions, 26 deletions
diff --git a/cpp/src/qpid/store/CMakeLists.txt b/cpp/src/qpid/store/CMakeLists.txt index d5cf8abc27..0d25923175 100644 --- a/cpp/src/qpid/store/CMakeLists.txt +++ b/cpp/src/qpid/store/CMakeLists.txt @@ -73,7 +73,7 @@ if (BUILD_MSSQL) ms-sql/Recordset.cpp ms-sql/State.cpp ms-sql/VariantHelper.cpp) - target_link_libraries (mssql_store qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY}) + target_link_libraries (mssql_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY}) install (TARGETS mssql_store # RUNTIME DESTINATION ${QPIDD_MODULE_DIR} COMPONENT ${QPID_COMPONENT_BROKER}) diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp index 4c2b9892ab..e06f8a750f 100644 --- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp +++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp @@ -689,32 +689,49 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) { + // If this enqueue is in the context of a transaction, use the specified + // transaction to nest a new transaction for this operation. However, if + // this is not in the context of a transaction, then just use the thread's + // DatabaseConnection with a ADO transaction. + DatabaseConnection *db = 0; AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); - if (atxn == 0) - throw qpid::broker::InvalidTransactionContextException(); - (void)initState(); // Ensure this thread is initialized - try { - atxn->begin(); + if (atxn == 0) { + db = initConnection(); + db->beginTransaction(); + } + else { + (void)initState(); // Ensure this thread is initialized + db = atxn->dbConn(); + try { + atxn->begin(); + } + catch(_com_error &e) { + throw ADOException("Error queuing message", e); + } } - catch(_com_error &e) { - throw ADOException("Error queuing message", e); - } try { if (msg->getPersistenceId() == 0) { // Message itself not yet saved MessageRecordset rsMessages; - rsMessages.open(atxn->dbConn(), TblMessage); + rsMessages.open(db, TblMessage); rsMessages.add(msg); } MessageMapRecordset rsMap; - rsMap.open(atxn->dbConn(), TblMessageMap); + rsMap.open(db, TblMessageMap); rsMap.add(msg->getPersistenceId(), queue.getPersistenceId()); - atxn->commit(); + if (atxn) + atxn->commit(); + else + db->commitTransaction(); } catch(_com_error &e) { - atxn->abort(); + if (atxn) + atxn->abort(); + else + db->rollbackTransaction(); throw ADOException("Error queuing message", e); - } + } + msg->enqueueComplete(); } /** @@ -731,32 +748,50 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) { + // If this dequeue is in the context of a transaction, use the specified + // transaction to nest a new transaction for this operation. However, if + // this is not in the context of a transaction, then just use the thread's + // DatabaseConnection with a ADO transaction. + DatabaseConnection *db = 0; AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); - if (atxn == 0) - throw qpid::broker::InvalidTransactionContextException(); - (void)initState(); // Ensure this thread is initialized - try { - atxn->begin(); + if (atxn == 0) { + db = initConnection(); + db->beginTransaction(); } - catch(_com_error &e) { - throw ADOException("Error queuing message", e); - } + else { + (void)initState(); // Ensure this thread is initialized + db = atxn->dbConn(); + try { + atxn->begin(); + } + catch(_com_error &e) { + throw ADOException("Error queuing message", e); + } + } + try { MessageMapRecordset rsMap; - rsMap.open(atxn->dbConn(), TblMessageMap); + rsMap.open(db, TblMessageMap); bool more = rsMap.remove(msg->getPersistenceId(), queue.getPersistenceId()); if (!more) { MessageRecordset rsMessages; - rsMessages.open(atxn->dbConn(), TblMessage); + rsMessages.open(db, TblMessage); rsMessages.remove(msg); } - atxn->commit(); + if (atxn) + atxn->commit(); + else + db->commitTransaction(); } catch(_com_error &e) { - atxn->abort(); + if (atxn) + atxn->abort(); + else + db->rollbackTransaction(); throw ADOException("Error dequeuing message", e); } + msg->dequeueComplete(); } std::auto_ptr<qpid::broker::TransactionContext> @@ -795,6 +830,8 @@ MSSqlProvider::begin(const std::string& xid) void MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn) { + // The inner transactions used for the components of the TPC are done; + // nothing else to do but wait for the commit. } void diff --git a/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp b/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp index a29b97fa8a..edf5d8e518 100644 --- a/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp +++ b/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp @@ -42,6 +42,8 @@ MessageRecordset::add(const boost::intrusive_ptr<qpid::broker::PersistableMessag rs->AddNew(); rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob); rs->Update(); + uint64_t id = rs->Fields->Item["persistenceId"]->Value; + msg->setPersistenceId(id); } void |