diff options
author | Gordon Sim <gsim@apache.org> | 2007-05-25 11:24:54 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-05-25 11:24:54 +0000 |
commit | 45f0ee18e3dacf9e8c746009eaef4e17b0a44bf8 (patch) | |
tree | 67a2ae89ca92c9b4fdc94e2f6a817439e648d069 | |
parent | f646350b5e59ccf49f1253bd55f98d062769f2ee (diff) | |
download | qpid-python-45f0ee18e3dacf9e8c746009eaef4e17b0a44bf8.tar.gz |
Added support for recovering prepared transactions.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@541619 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/Makefile.am | 5 | ||||
-rw-r--r-- | cpp/src/qpid/CommonOptions.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxWorkRecord.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxWorkRecord.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoverableExchange.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoverableTransaction.h | 49 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveredDequeue.cpp | 38 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveredDequeue.h | 50 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveredEnqueue.cpp | 38 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveredEnqueue.h | 50 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManager.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 58 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.h | 6 |
16 files changed, 333 insertions, 8 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index cb17425369..4293719a6f 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -181,6 +181,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueuePolicy.cpp \ qpid/broker/QueueRegistry.cpp \ qpid/broker/RecoveryManagerImpl.cpp \ + qpid/broker/RecoveredEnqueue.cpp \ + qpid/broker/RecoveredDequeue.cpp \ qpid/broker/Reference.cpp \ qpid/broker/TopicExchange.cpp \ qpid/broker/TxAck.cpp \ @@ -234,7 +236,10 @@ nobase_include_HEADERS = \ qpid/broker/RecoverableExchange.h \ qpid/broker/RecoverableMessage.h \ qpid/broker/RecoverableQueue.h \ + qpid/broker/RecoverableTransaction.h \ qpid/broker/RecoveryManager.h \ + qpid/broker/RecoveredEnqueue.h \ + qpid/broker/RecoveredDequeue.h \ qpid/broker/Reference.h \ qpid/broker/TxBuffer.h \ qpid/broker/TxOp.h \ diff --git a/cpp/src/qpid/CommonOptions.cpp b/cpp/src/qpid/CommonOptions.cpp index 8ec1a42ee2..f272c71a27 100644 --- a/cpp/src/qpid/CommonOptions.cpp +++ b/cpp/src/qpid/CommonOptions.cpp @@ -19,6 +19,7 @@ #include "CommonOptions.h" #include <fstream> #include <algorithm> +#include <iostream> namespace qpid { namespace program_options { @@ -28,10 +29,9 @@ char env2optchar(char env) { } const std::string envPrefix("QPID_"); -const std::string ignore("QPID_DIR");//temporary hack - this env var is used in other ways; not an option std::string env2option(const std::string& env) { - if (env != ignore /*temp hack, see above*/ && env.find(envPrefix) == 0) { + if (env.find(envPrefix) == 0) { std::string opt = env.substr(envPrefix.size()); std::transform(opt.begin(), opt.end(), opt.begin(), env2optchar); return opt; @@ -62,6 +62,9 @@ void parseOptions( try { po::store(po::parse_environment(desc, po::env2option), vm); } + catch (const po::unknown_option& e) { + std::cerr << e.what() << std::endl; + } catch (const po::error& e) { throw po::error(std::string("parsing environment variables: ") + e.what()); diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index de2f2c55f4..01e049bdda 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -92,8 +92,8 @@ Broker::Broker(const Broker::Options& conf) : exchanges.declare(amq_match, HeadersExchange::typeName); if(store.get()) { - RecoveryManagerImpl recoverer( - queues, exchanges, conf.stagingThreshold); + RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, + conf.stagingThreshold); store->recover(recoverer); } diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index 6c074bbd51..2af87a8751 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -29,11 +29,20 @@ DtxManager::~DtxManager() {} void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops) { + /* WorkMap::iterator i = work.find(xid); if (i == work.end()) { i = work.insert(xid, new DtxWorkRecord(xid, store)).first; } i->add(ops); + */ + + getOrCreateWork(xid)->add(ops); +} + +void DtxManager::recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops) +{ + getOrCreateWork(xid)->recover(txn, ops); } void DtxManager::prepare(const std::string& xid) @@ -54,6 +63,17 @@ void DtxManager::rollback(const std::string& xid) DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid) { WorkMap::iterator i = work.find(xid); - if (i == work.end()) throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid); + if (i == work.end()) { + throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid); + } + return i; +} + +DtxManager::WorkMap::iterator DtxManager::getOrCreateWork(std::string& xid) +{ + WorkMap::iterator i = work.find(xid); + if (i == work.end()) { + i = work.insert(xid, new DtxWorkRecord(xid, store)).first; + } return i; } diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h index 230a0631f3..e908faecac 100644 --- a/cpp/src/qpid/broker/DtxManager.h +++ b/cpp/src/qpid/broker/DtxManager.h @@ -37,11 +37,13 @@ class DtxManager{ TransactionalStore* const store; WorkMap::iterator getWork(const std::string& xid); + WorkMap::iterator getOrCreateWork(std::string& xid); public: DtxManager(TransactionalStore* const store); ~DtxManager(); void start(std::string xid, DtxBuffer::shared_ptr work); + void recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr work); void prepare(const std::string& xid); void commit(const std::string& xid); void rollback(const std::string& xid); diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp index 218131f6bc..ff6e952517 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -105,3 +105,9 @@ void DtxWorkRecord::abort() } for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback)); } + +void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, DtxBuffer::shared_ptr ops) +{ + add(ops); + txn = _txn; +} diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h index 18b41c7808..0453ea1644 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.h +++ b/cpp/src/qpid/broker/DtxWorkRecord.h @@ -56,6 +56,7 @@ public: void commit(); void rollback(); void add(DtxBuffer::shared_ptr ops); + void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops); }; } diff --git a/cpp/src/qpid/broker/RecoverableExchange.h b/cpp/src/qpid/broker/RecoverableExchange.h index 0af4aea232..76d0d2ecdf 100644 --- a/cpp/src/qpid/broker/RecoverableExchange.h +++ b/cpp/src/qpid/broker/RecoverableExchange.h @@ -23,6 +23,7 @@ */ #include <boost/shared_ptr.hpp> +#include "qpid/framing/FieldTable.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/RecoverableTransaction.h b/cpp/src/qpid/broker/RecoverableTransaction.h new file mode 100644 index 0000000000..7fe34b6756 --- /dev/null +++ b/cpp/src/qpid/broker/RecoverableTransaction.h @@ -0,0 +1,49 @@ +#ifndef _broker_RecoverableTransaction_h +#define _broker_RecoverableTransaction_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/shared_ptr.hpp> + +#include "RecoverableMessage.h" +#include "RecoverableQueue.h" + +namespace qpid { +namespace broker { + +/** + * The interface through which prepared 2pc transactions are + * recovered. + */ +class RecoverableTransaction +{ +public: + typedef boost::shared_ptr<RecoverableTransaction> shared_ptr; + virtual void enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message) = 0; + virtual void dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message) = 0; + virtual ~RecoverableTransaction() {}; +}; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/RecoveredDequeue.cpp b/cpp/src/qpid/broker/RecoveredDequeue.cpp new file mode 100644 index 0000000000..4551bf8761 --- /dev/null +++ b/cpp/src/qpid/broker/RecoveredDequeue.cpp @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "RecoveredDequeue.h" + +using namespace qpid::broker; + +RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, Message::shared_ptr _msg) : queue(_queue), msg(_msg) {} + +bool RecoveredDequeue::prepare(TransactionContext*) throw(){ + //should never be called; transaction has already prepared if an enqueue is recovered + return false; +} + +void RecoveredDequeue::commit() throw(){ +} + +void RecoveredDequeue::rollback() throw(){ + queue->process(msg); +} + diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h new file mode 100644 index 0000000000..9e0c334dc3 --- /dev/null +++ b/cpp/src/qpid/broker/RecoveredDequeue.h @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _RecoveredDequeue_ +#define _RecoveredDequeue_ + +#include <algorithm> +#include <functional> +#include <list> +#include "Deliverable.h" +#include "BrokerMessage.h" +#include "MessageStore.h" +#include "BrokerQueue.h" +#include "TxOp.h" + +namespace qpid { + namespace broker { + class RecoveredDequeue : public TxOp{ + Queue::shared_ptr queue; + Message::shared_ptr msg; + + public: + RecoveredDequeue(Queue::shared_ptr queue, Message::shared_ptr msg); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~RecoveredDequeue(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.cpp b/cpp/src/qpid/broker/RecoveredEnqueue.cpp new file mode 100644 index 0000000000..533af864b6 --- /dev/null +++ b/cpp/src/qpid/broker/RecoveredEnqueue.cpp @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "RecoveredEnqueue.h" + +using namespace qpid::broker; + +RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, Message::shared_ptr _msg) : queue(_queue), msg(_msg) {} + +bool RecoveredEnqueue::prepare(TransactionContext*) throw(){ + //should never be called; transaction has already prepared if an enqueue is recovered + return false; +} + +void RecoveredEnqueue::commit() throw(){ + queue->process(msg); +} + +void RecoveredEnqueue::rollback() throw(){ +} + diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h new file mode 100644 index 0000000000..25c5baf364 --- /dev/null +++ b/cpp/src/qpid/broker/RecoveredEnqueue.h @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _RecoveredEnqueue_ +#define _RecoveredEnqueue_ + +#include <algorithm> +#include <functional> +#include <list> +#include "Deliverable.h" +#include "BrokerMessage.h" +#include "MessageStore.h" +#include "BrokerQueue.h" +#include "TxOp.h" + +namespace qpid { + namespace broker { + class RecoveredEnqueue : public TxOp{ + Queue::shared_ptr queue; + Message::shared_ptr msg; + + public: + RecoveredEnqueue(Queue::shared_ptr queue, Message::shared_ptr msg); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~RecoveredEnqueue(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h index aae2bbe3ac..b6ccb74658 100644 --- a/cpp/src/qpid/broker/RecoveryManager.h +++ b/cpp/src/qpid/broker/RecoveryManager.h @@ -24,6 +24,8 @@ #include "RecoverableExchange.h" #include "RecoverableQueue.h" #include "RecoverableMessage.h" +#include "RecoverableTransaction.h" +#include "TransactionalStore.h" #include "qpid/framing/Buffer.h" namespace qpid { @@ -35,6 +37,8 @@ namespace broker { virtual RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer) = 0; virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer) = 0; virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0; + virtual RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid, + std::auto_ptr<TPCTransactionContext> txn) = 0; virtual void recoveryComplete() = 0; }; diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 355c8de926..2daf3b2d0a 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -23,6 +23,8 @@ #include "BrokerMessage.h" #include "BrokerMessageMessage.h" #include "BrokerQueue.h" +#include "RecoveredEnqueue.h" +#include "RecoveredDequeue.h" using namespace qpid; using namespace qpid::broker; @@ -32,8 +34,9 @@ using boost::dynamic_pointer_cast; static const uint8_t BASIC = 1; static const uint8_t MESSAGE = 2; -RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, uint64_t _stagingThreshold) - : queues(_queues), exchanges(_exchanges), stagingThreshold(_stagingThreshold) {} +RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, + DtxManager& _dtxMgr, uint64_t _stagingThreshold) + : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} @@ -49,6 +52,8 @@ public: bool loadContent(uint64_t available); void decodeContent(framing::Buffer& buffer); void recover(Queue::shared_ptr queue); + void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); + void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); }; class RecoverableQueueImpl : public RecoverableQueue @@ -59,6 +64,8 @@ public: ~RecoverableQueueImpl() {}; void setPersistenceId(uint64_t id); void recover(RecoverableMessage::shared_ptr msg); + void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); + void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); }; class RecoverableExchangeImpl : public RecoverableExchange @@ -71,6 +78,15 @@ public: void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args); }; +class RecoverableTransactionImpl : public RecoverableTransaction +{ + DtxBuffer::shared_ptr buffer; +public: + RecoverableTransactionImpl(DtxBuffer::shared_ptr _buffer) : buffer(_buffer) {} + void enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message); + void dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message); +}; + RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer) { return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(Exchange::decode(exchanges, buffer), queues)); @@ -102,6 +118,14 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold)); } +RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, + std::auto_ptr<TPCTransactionContext> txn) +{ + DtxBuffer::shared_ptr buffer(new DtxBuffer()); + dtxMgr.recover(xid, txn, buffer); + return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer)); +} + void RecoveryManagerImpl::recoveryComplete() { //TODO (finalise binding setup etc) @@ -162,3 +186,33 @@ void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::Fiel Queue::shared_ptr queue = queues.find(queueName); exchange->bind(queue, key, &args); } + +void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) +{ + buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg))); +} + +void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) +{ + buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg))); +} + +void RecoverableQueueImpl::dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message) +{ + dynamic_pointer_cast<RecoverableMessageImpl>(message)->dequeue(buffer, queue); +} + +void RecoverableQueueImpl::enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message) +{ + dynamic_pointer_cast<RecoverableMessageImpl>(message)->enqueue(buffer, queue); +} + +void RecoverableTransactionImpl::dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message) +{ + dynamic_pointer_cast<RecoverableQueueImpl>(queue)->dequeue(buffer, message); +} + +void RecoverableTransactionImpl::enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message) +{ + dynamic_pointer_cast<RecoverableQueueImpl>(queue)->enqueue(buffer, message); +} diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h index 7802eee711..bcd71defb1 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.h +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h @@ -22,6 +22,7 @@ #define _RecoveryManagerImpl_ #include <list> +#include "DtxManager.h" #include "ExchangeRegistry.h" #include "QueueRegistry.h" #include "RecoveryManager.h" @@ -32,14 +33,17 @@ namespace broker { class RecoveryManagerImpl : public RecoveryManager{ QueueRegistry& queues; ExchangeRegistry& exchanges; + DtxManager& dtxMgr; const uint64_t stagingThreshold; public: - RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, uint64_t stagingThreshold); + RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, DtxManager& dtxMgr, uint64_t stagingThreshold); ~RecoveryManagerImpl(); RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer); RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer); RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer); + RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid, + std::auto_ptr<TPCTransactionContext> txn); void recoveryComplete(); static uint8_t decodeMessageType(framing::Buffer& buffer); |