summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/store/MessageStorePlugin.cpp
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2010-04-16 20:12:55 +0000
committerStephen D. Huston <shuston@apache.org>2010-04-16 20:12:55 +0000
commita2313ce0fc34fbe4864445595e1db1955c4918a1 (patch)
tree4757029ffe7a3970cf38d32f8775a14bef7ea259 /cpp/src/qpid/store/MessageStorePlugin.cpp
parenteb56a638b2aea7e56c55e49b267cfe2f673afe51 (diff)
downloadqpid-python-a2313ce0fc34fbe4864445595e1db1955c4918a1.tar.gz
Fix for QPID-2420 to correctly handle restoring and commit/abort prepared transactions.
The basic approach is documented in QPID-2420. This also makes improvements in the way changes are done to the tblMessageMap table which should perform much better, avoiding pulling the whole table into the broker just to add or edit or delete a single record. Also, some of the consistency checks and enforcements are moved into the database itself from the C++ code. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@935068 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/store/MessageStorePlugin.cpp')
-rw-r--r--cpp/src/qpid/store/MessageStorePlugin.cpp26
1 files changed, 20 insertions, 6 deletions
diff --git a/cpp/src/qpid/store/MessageStorePlugin.cpp b/cpp/src/qpid/store/MessageStorePlugin.cpp
index 05b6ef4465..fdb947eef2 100644
--- a/cpp/src/qpid/store/MessageStorePlugin.cpp
+++ b/cpp/src/qpid/store/MessageStorePlugin.cpp
@@ -405,11 +405,14 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
QueueMap queues;
MessageMap messages;
MessageQueueMap messageQueueMap;
+ std::vector<std::string> xids;
+ PreparedTransactionMap dtxMap;
provider->second->recoverConfigs(recoverer);
provider->second->recoverExchanges(recoverer, exchanges);
provider->second->recoverQueues(recoverer, queues);
provider->second->recoverBindings(recoverer, exchanges, queues);
+ provider->second->recoverTransactions(recoverer, dtxMap);
provider->second->recoverMessages(recoverer, messages, messageQueueMap);
// Enqueue msgs where needed.
for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
@@ -426,22 +429,33 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
broker::RecoverableMessage::shared_ptr msg = iMsg->second;
// Now for each queue referenced in the queue map, locate it
// and re-enqueue the message.
- for (std::vector<uint64_t>::const_iterator j = i->second.begin();
+ for (std::vector<QueueEntry>::const_iterator j = i->second.begin();
j != i->second.end();
++j) {
// Locate the queue corresponding to the current queue Id
- QueueMap::const_iterator iQ = queues.find(*j);
+ QueueMap::const_iterator iQ = queues.find(j->queueId);
if (iQ == queues.end()) {
std::ostringstream oss;
oss << "No matching queue trying to re-enqueue message "
- << " on queue Id " << *j;
+ << " on queue Id " << j->queueId;
THROW_STORE_EXCEPTION(oss.str());
}
- iQ->second->recover(msg);
+ // Messages involved in prepared transactions have their status
+ // updated accordingly. First, though, restore a message that
+ // is expected to be on a queue, including non-transacted
+ // messages and those pending dequeue in a dtx.
+ if (j->tplStatus != QueueEntry::ADDING)
+ iQ->second->recover(msg);
+ switch(j->tplStatus) {
+ case QueueEntry::ADDING:
+ dtxMap[j->xid]->enqueue(iQ->second, msg);
+ break;
+ case QueueEntry::REMOVING:
+ dtxMap[j->xid]->dequeue(iQ->second, msg);
+ break;
+ }
}
}
-
- // recoverTransactions() and apply correctly while re-enqueuing
}
}} // namespace qpid::store