summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/store/MessageStorePlugin.cpp
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2010-04-16 20:12:55 +0000
committerStephen D. Huston <shuston@apache.org>2010-04-16 20:12:55 +0000
commite70b372b5f0b6a5e5f6e39e4897de1557c2a2089 (patch)
tree4757029ffe7a3970cf38d32f8775a14bef7ea259 /cpp/src/qpid/store/MessageStorePlugin.cpp
parent72432e4034623c8a5e0e1e0a7555bdd2cf76a3f3 (diff)
downloadqpid-python-e70b372b5f0b6a5e5f6e39e4897de1557c2a2089.tar.gz
Fix for QPID-2420 to correctly handle restoring and commit/abort prepared transactions.
The basic approach is documented in QPID-2420. This also makes improvements in the way changes are done to the tblMessageMap table which should perform much better, avoiding pulling the whole table into the broker just to add or edit or delete a single record. Also, some of the consistency checks and enforcements are moved into the database itself from the C++ code. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@935068 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/store/MessageStorePlugin.cpp')
-rw-r--r--cpp/src/qpid/store/MessageStorePlugin.cpp26
1 files changed, 20 insertions, 6 deletions
diff --git a/cpp/src/qpid/store/MessageStorePlugin.cpp b/cpp/src/qpid/store/MessageStorePlugin.cpp
index 05b6ef4465..fdb947eef2 100644
--- a/cpp/src/qpid/store/MessageStorePlugin.cpp
+++ b/cpp/src/qpid/store/MessageStorePlugin.cpp
@@ -405,11 +405,14 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
QueueMap queues;
MessageMap messages;
MessageQueueMap messageQueueMap;
+ std::vector<std::string> xids;
+ PreparedTransactionMap dtxMap;
provider->second->recoverConfigs(recoverer);
provider->second->recoverExchanges(recoverer, exchanges);
provider->second->recoverQueues(recoverer, queues);
provider->second->recoverBindings(recoverer, exchanges, queues);
+ provider->second->recoverTransactions(recoverer, dtxMap);
provider->second->recoverMessages(recoverer, messages, messageQueueMap);
// Enqueue msgs where needed.
for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
@@ -426,22 +429,33 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
broker::RecoverableMessage::shared_ptr msg = iMsg->second;
// Now for each queue referenced in the queue map, locate it
// and re-enqueue the message.
- for (std::vector<uint64_t>::const_iterator j = i->second.begin();
+ for (std::vector<QueueEntry>::const_iterator j = i->second.begin();
j != i->second.end();
++j) {
// Locate the queue corresponding to the current queue Id
- QueueMap::const_iterator iQ = queues.find(*j);
+ QueueMap::const_iterator iQ = queues.find(j->queueId);
if (iQ == queues.end()) {
std::ostringstream oss;
oss << "No matching queue trying to re-enqueue message "
- << " on queue Id " << *j;
+ << " on queue Id " << j->queueId;
THROW_STORE_EXCEPTION(oss.str());
}
- iQ->second->recover(msg);
+ // Messages involved in prepared transactions have their status
+ // updated accordingly. First, though, restore a message that
+ // is expected to be on a queue, including non-transacted
+ // messages and those pending dequeue in a dtx.
+ if (j->tplStatus != QueueEntry::ADDING)
+ iQ->second->recover(msg);
+ switch(j->tplStatus) {
+ case QueueEntry::ADDING:
+ dtxMap[j->xid]->enqueue(iQ->second, msg);
+ break;
+ case QueueEntry::REMOVING:
+ dtxMap[j->xid]->dequeue(iQ->second, msg);
+ break;
+ }
}
}
-
- // recoverTransactions() and apply correctly while re-enqueuing
}
}} // namespace qpid::store