summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/store/MessageStorePlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/store/MessageStorePlugin.cpp')
-rw-r--r--cpp/src/qpid/store/MessageStorePlugin.cpp26
1 files changed, 20 insertions, 6 deletions
diff --git a/cpp/src/qpid/store/MessageStorePlugin.cpp b/cpp/src/qpid/store/MessageStorePlugin.cpp
index 05b6ef4465..fdb947eef2 100644
--- a/cpp/src/qpid/store/MessageStorePlugin.cpp
+++ b/cpp/src/qpid/store/MessageStorePlugin.cpp
@@ -405,11 +405,14 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
QueueMap queues;
MessageMap messages;
MessageQueueMap messageQueueMap;
+ std::vector<std::string> xids;
+ PreparedTransactionMap dtxMap;
provider->second->recoverConfigs(recoverer);
provider->second->recoverExchanges(recoverer, exchanges);
provider->second->recoverQueues(recoverer, queues);
provider->second->recoverBindings(recoverer, exchanges, queues);
+ provider->second->recoverTransactions(recoverer, dtxMap);
provider->second->recoverMessages(recoverer, messages, messageQueueMap);
// Enqueue msgs where needed.
for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
@@ -426,22 +429,33 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
broker::RecoverableMessage::shared_ptr msg = iMsg->second;
// Now for each queue referenced in the queue map, locate it
// and re-enqueue the message.
- for (std::vector<uint64_t>::const_iterator j = i->second.begin();
+ for (std::vector<QueueEntry>::const_iterator j = i->second.begin();
j != i->second.end();
++j) {
// Locate the queue corresponding to the current queue Id
- QueueMap::const_iterator iQ = queues.find(*j);
+ QueueMap::const_iterator iQ = queues.find(j->queueId);
if (iQ == queues.end()) {
std::ostringstream oss;
oss << "No matching queue trying to re-enqueue message "
- << " on queue Id " << *j;
+ << " on queue Id " << j->queueId;
THROW_STORE_EXCEPTION(oss.str());
}
- iQ->second->recover(msg);
+ // Messages involved in prepared transactions have their status
+ // updated accordingly. First, though, restore a message that
+ // is expected to be on a queue, including non-transacted
+ // messages and those pending dequeue in a dtx.
+ if (j->tplStatus != QueueEntry::ADDING)
+ iQ->second->recover(msg);
+ switch(j->tplStatus) {
+ case QueueEntry::ADDING:
+ dtxMap[j->xid]->enqueue(iQ->second, msg);
+ break;
+ case QueueEntry::REMOVING:
+ dtxMap[j->xid]->dequeue(iQ->second, msg);
+ break;
+ }
}
}
-
- // recoverTransactions() and apply correctly while re-enqueuing
}
}} // namespace qpid::store