diff options
Diffstat (limited to 'cpp/src/qpid/store/MessageStorePlugin.cpp')
-rw-r--r-- | cpp/src/qpid/store/MessageStorePlugin.cpp | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/cpp/src/qpid/store/MessageStorePlugin.cpp b/cpp/src/qpid/store/MessageStorePlugin.cpp index 05b6ef4465..fdb947eef2 100644 --- a/cpp/src/qpid/store/MessageStorePlugin.cpp +++ b/cpp/src/qpid/store/MessageStorePlugin.cpp @@ -405,11 +405,14 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer) QueueMap queues; MessageMap messages; MessageQueueMap messageQueueMap; + std::vector<std::string> xids; + PreparedTransactionMap dtxMap; provider->second->recoverConfigs(recoverer); provider->second->recoverExchanges(recoverer, exchanges); provider->second->recoverQueues(recoverer, queues); provider->second->recoverBindings(recoverer, exchanges, queues); + provider->second->recoverTransactions(recoverer, dtxMap); provider->second->recoverMessages(recoverer, messages, messageQueueMap); // Enqueue msgs where needed. for (MessageQueueMap::const_iterator i = messageQueueMap.begin(); @@ -426,22 +429,33 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer) broker::RecoverableMessage::shared_ptr msg = iMsg->second; // Now for each queue referenced in the queue map, locate it // and re-enqueue the message. - for (std::vector<uint64_t>::const_iterator j = i->second.begin(); + for (std::vector<QueueEntry>::const_iterator j = i->second.begin(); j != i->second.end(); ++j) { // Locate the queue corresponding to the current queue Id - QueueMap::const_iterator iQ = queues.find(*j); + QueueMap::const_iterator iQ = queues.find(j->queueId); if (iQ == queues.end()) { std::ostringstream oss; oss << "No matching queue trying to re-enqueue message " - << " on queue Id " << *j; + << " on queue Id " << j->queueId; THROW_STORE_EXCEPTION(oss.str()); } - iQ->second->recover(msg); + // Messages involved in prepared transactions have their status + // updated accordingly. First, though, restore a message that + // is expected to be on a queue, including non-transacted + // messages and those pending dequeue in a dtx. + if (j->tplStatus != QueueEntry::ADDING) + iQ->second->recover(msg); + switch(j->tplStatus) { + case QueueEntry::ADDING: + dtxMap[j->xid]->enqueue(iQ->second, msg); + break; + case QueueEntry::REMOVING: + dtxMap[j->xid]->dequeue(iQ->second, msg); + break; + } } } - - // recoverTransactions() and apply correctly while re-enqueuing } }} // namespace qpid::store |