summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2009-11-03 00:53:24 +0000
committerStephen D. Huston <shuston@apache.org>2009-11-03 00:53:24 +0000
commit8e3202df7c4701d1effd19d0e0a9e332a8c3bc1c (patch)
tree0b6ce145425a54c09ce9b7b6c2be2691106aa3ab /cpp
parentae3b13c693bab1a66d4a5d72873869fd128e32a3 (diff)
downloadqpid-python-8e3202df7c4701d1effd19d0e0a9e332a8c3bc1c.tar.gz
When storing a message, set its new persistence Id. Don't throw an exception when attempting to enqueue/dequeue a message without a AMQP transaction associated. Mark queued/dequeued messages complete right after storing in database; there's no delayed notify needed for this storage provider.
Fixes QPID-2170. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@832234 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/store/CMakeLists.txt2
-rw-r--r--cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp87
-rw-r--r--cpp/src/qpid/store/ms-sql/MessageRecordset.cpp2
3 files changed, 65 insertions, 26 deletions
diff --git a/cpp/src/qpid/store/CMakeLists.txt b/cpp/src/qpid/store/CMakeLists.txt
index d5cf8abc27..0d25923175 100644
--- a/cpp/src/qpid/store/CMakeLists.txt
+++ b/cpp/src/qpid/store/CMakeLists.txt
@@ -73,7 +73,7 @@ if (BUILD_MSSQL)
ms-sql/Recordset.cpp
ms-sql/State.cpp
ms-sql/VariantHelper.cpp)
- target_link_libraries (mssql_store qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
+ target_link_libraries (mssql_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
install (TARGETS mssql_store # RUNTIME
DESTINATION ${QPIDD_MODULE_DIR}
COMPONENT ${QPID_COMPONENT_BROKER})
diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
index 4c2b9892ab..e06f8a750f 100644
--- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
+++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
@@ -689,32 +689,49 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
+ // If this enqueue is in the context of a transaction, use the specified
+ // transaction to nest a new transaction for this operation. However, if
+ // this is not in the context of a transaction, then just use the thread's
+ // DatabaseConnection with a ADO transaction.
+ DatabaseConnection *db = 0;
AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
- if (atxn == 0)
- throw qpid::broker::InvalidTransactionContextException();
- (void)initState(); // Ensure this thread is initialized
- try {
- atxn->begin();
+ if (atxn == 0) {
+ db = initConnection();
+ db->beginTransaction();
+ }
+ else {
+ (void)initState(); // Ensure this thread is initialized
+ db = atxn->dbConn();
+ try {
+ atxn->begin();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error queuing message", e);
+ }
}
- catch(_com_error &e) {
- throw ADOException("Error queuing message", e);
- }
try {
if (msg->getPersistenceId() == 0) { // Message itself not yet saved
MessageRecordset rsMessages;
- rsMessages.open(atxn->dbConn(), TblMessage);
+ rsMessages.open(db, TblMessage);
rsMessages.add(msg);
}
MessageMapRecordset rsMap;
- rsMap.open(atxn->dbConn(), TblMessageMap);
+ rsMap.open(db, TblMessageMap);
rsMap.add(msg->getPersistenceId(), queue.getPersistenceId());
- atxn->commit();
+ if (atxn)
+ atxn->commit();
+ else
+ db->commitTransaction();
}
catch(_com_error &e) {
- atxn->abort();
+ if (atxn)
+ atxn->abort();
+ else
+ db->rollbackTransaction();
throw ADOException("Error queuing message", e);
- }
+ }
+ msg->enqueueComplete();
}
/**
@@ -731,32 +748,50 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
+ // If this dequeue is in the context of a transaction, use the specified
+ // transaction to nest a new transaction for this operation. However, if
+ // this is not in the context of a transaction, then just use the thread's
+ // DatabaseConnection with a ADO transaction.
+ DatabaseConnection *db = 0;
AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
- if (atxn == 0)
- throw qpid::broker::InvalidTransactionContextException();
- (void)initState(); // Ensure this thread is initialized
- try {
- atxn->begin();
+ if (atxn == 0) {
+ db = initConnection();
+ db->beginTransaction();
}
- catch(_com_error &e) {
- throw ADOException("Error queuing message", e);
- }
+ else {
+ (void)initState(); // Ensure this thread is initialized
+ db = atxn->dbConn();
+ try {
+ atxn->begin();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error queuing message", e);
+ }
+ }
+
try {
MessageMapRecordset rsMap;
- rsMap.open(atxn->dbConn(), TblMessageMap);
+ rsMap.open(db, TblMessageMap);
bool more = rsMap.remove(msg->getPersistenceId(),
queue.getPersistenceId());
if (!more) {
MessageRecordset rsMessages;
- rsMessages.open(atxn->dbConn(), TblMessage);
+ rsMessages.open(db, TblMessage);
rsMessages.remove(msg);
}
- atxn->commit();
+ if (atxn)
+ atxn->commit();
+ else
+ db->commitTransaction();
}
catch(_com_error &e) {
- atxn->abort();
+ if (atxn)
+ atxn->abort();
+ else
+ db->rollbackTransaction();
throw ADOException("Error dequeuing message", e);
}
+ msg->dequeueComplete();
}
std::auto_ptr<qpid::broker::TransactionContext>
@@ -795,6 +830,8 @@ MSSqlProvider::begin(const std::string& xid)
void
MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn)
{
+ // The inner transactions used for the components of the TPC are done;
+ // nothing else to do but wait for the commit.
}
void
diff --git a/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp b/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
index a29b97fa8a..edf5d8e518 100644
--- a/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
+++ b/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
@@ -42,6 +42,8 @@ MessageRecordset::add(const boost::intrusive_ptr<qpid::broker::PersistableMessag
rs->AddNew();
rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
rs->Update();
+ uint64_t id = rs->Fields->Item["persistenceId"]->Value;
+ msg->setPersistenceId(id);
}
void