summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/Makefile.am5
-rw-r--r--qpid/cpp/src/qpid/CommonOptions.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/DtxManager.cpp22
-rw-r--r--qpid/cpp/src/qpid/broker/DtxManager.h2
-rw-r--r--qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/DtxWorkRecord.h1
-rw-r--r--qpid/cpp/src/qpid/broker/RecoverableExchange.h1
-rw-r--r--qpid/cpp/src/qpid/broker/RecoverableTransaction.h49
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp38
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveredDequeue.h50
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp38
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveredEnqueue.h50
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManager.h4
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp58
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h6
16 files changed, 333 insertions, 8 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index cb17425369..4293719a6f 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -181,6 +181,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueuePolicy.cpp \
qpid/broker/QueueRegistry.cpp \
qpid/broker/RecoveryManagerImpl.cpp \
+ qpid/broker/RecoveredEnqueue.cpp \
+ qpid/broker/RecoveredDequeue.cpp \
qpid/broker/Reference.cpp \
qpid/broker/TopicExchange.cpp \
qpid/broker/TxAck.cpp \
@@ -234,7 +236,10 @@ nobase_include_HEADERS = \
qpid/broker/RecoverableExchange.h \
qpid/broker/RecoverableMessage.h \
qpid/broker/RecoverableQueue.h \
+ qpid/broker/RecoverableTransaction.h \
qpid/broker/RecoveryManager.h \
+ qpid/broker/RecoveredEnqueue.h \
+ qpid/broker/RecoveredDequeue.h \
qpid/broker/Reference.h \
qpid/broker/TxBuffer.h \
qpid/broker/TxOp.h \
diff --git a/qpid/cpp/src/qpid/CommonOptions.cpp b/qpid/cpp/src/qpid/CommonOptions.cpp
index 8ec1a42ee2..f272c71a27 100644
--- a/qpid/cpp/src/qpid/CommonOptions.cpp
+++ b/qpid/cpp/src/qpid/CommonOptions.cpp
@@ -19,6 +19,7 @@
#include "CommonOptions.h"
#include <fstream>
#include <algorithm>
+#include <iostream>
namespace qpid {
namespace program_options {
@@ -28,10 +29,9 @@ char env2optchar(char env) {
}
const std::string envPrefix("QPID_");
-const std::string ignore("QPID_DIR");//temporary hack - this env var is used in other ways; not an option
std::string env2option(const std::string& env) {
- if (env != ignore /*temp hack, see above*/ && env.find(envPrefix) == 0) {
+ if (env.find(envPrefix) == 0) {
std::string opt = env.substr(envPrefix.size());
std::transform(opt.begin(), opt.end(), opt.begin(), env2optchar);
return opt;
@@ -62,6 +62,9 @@ void parseOptions(
try {
po::store(po::parse_environment(desc, po::env2option), vm);
}
+ catch (const po::unknown_option& e) {
+ std::cerr << e.what() << std::endl;
+ }
catch (const po::error& e) {
throw po::error(std::string("parsing environment variables: ")
+ e.what());
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index de2f2c55f4..01e049bdda 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -92,8 +92,8 @@ Broker::Broker(const Broker::Options& conf) :
exchanges.declare(amq_match, HeadersExchange::typeName);
if(store.get()) {
- RecoveryManagerImpl recoverer(
- queues, exchanges, conf.stagingThreshold);
+ RecoveryManagerImpl recoverer(queues, exchanges, dtxManager,
+ conf.stagingThreshold);
store->recover(recoverer);
}
diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp
index 6c074bbd51..2af87a8751 100644
--- a/qpid/cpp/src/qpid/broker/DtxManager.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp
@@ -29,11 +29,20 @@ DtxManager::~DtxManager() {}
void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops)
{
+ /*
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
i = work.insert(xid, new DtxWorkRecord(xid, store)).first;
}
i->add(ops);
+ */
+
+ getOrCreateWork(xid)->add(ops);
+}
+
+void DtxManager::recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops)
+{
+ getOrCreateWork(xid)->recover(txn, ops);
}
void DtxManager::prepare(const std::string& xid)
@@ -54,6 +63,17 @@ void DtxManager::rollback(const std::string& xid)
DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid)
{
WorkMap::iterator i = work.find(xid);
- if (i == work.end()) throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+ if (i == work.end()) {
+ throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+ }
+ return i;
+}
+
+DtxManager::WorkMap::iterator DtxManager::getOrCreateWork(std::string& xid)
+{
+ WorkMap::iterator i = work.find(xid);
+ if (i == work.end()) {
+ i = work.insert(xid, new DtxWorkRecord(xid, store)).first;
+ }
return i;
}
diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h
index 230a0631f3..e908faecac 100644
--- a/qpid/cpp/src/qpid/broker/DtxManager.h
+++ b/qpid/cpp/src/qpid/broker/DtxManager.h
@@ -37,11 +37,13 @@ class DtxManager{
TransactionalStore* const store;
WorkMap::iterator getWork(const std::string& xid);
+ WorkMap::iterator getOrCreateWork(std::string& xid);
public:
DtxManager(TransactionalStore* const store);
~DtxManager();
void start(std::string xid, DtxBuffer::shared_ptr work);
+ void recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr work);
void prepare(const std::string& xid);
void commit(const std::string& xid);
void rollback(const std::string& xid);
diff --git a/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp b/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
index 218131f6bc..ff6e952517 100644
--- a/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
@@ -105,3 +105,9 @@ void DtxWorkRecord::abort()
}
for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback));
}
+
+void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, DtxBuffer::shared_ptr ops)
+{
+ add(ops);
+ txn = _txn;
+}
diff --git a/qpid/cpp/src/qpid/broker/DtxWorkRecord.h b/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
index 18b41c7808..0453ea1644 100644
--- a/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
+++ b/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
@@ -56,6 +56,7 @@ public:
void commit();
void rollback();
void add(DtxBuffer::shared_ptr ops);
+ void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops);
};
}
diff --git a/qpid/cpp/src/qpid/broker/RecoverableExchange.h b/qpid/cpp/src/qpid/broker/RecoverableExchange.h
index 0af4aea232..76d0d2ecdf 100644
--- a/qpid/cpp/src/qpid/broker/RecoverableExchange.h
+++ b/qpid/cpp/src/qpid/broker/RecoverableExchange.h
@@ -23,6 +23,7 @@
*/
#include <boost/shared_ptr.hpp>
+#include "qpid/framing/FieldTable.h"
namespace qpid {
namespace broker {
diff --git a/qpid/cpp/src/qpid/broker/RecoverableTransaction.h b/qpid/cpp/src/qpid/broker/RecoverableTransaction.h
new file mode 100644
index 0000000000..7fe34b6756
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/RecoverableTransaction.h
@@ -0,0 +1,49 @@
+#ifndef _broker_RecoverableTransaction_h
+#define _broker_RecoverableTransaction_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+#include "RecoverableMessage.h"
+#include "RecoverableQueue.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface through which prepared 2pc transactions are
+ * recovered.
+ */
+class RecoverableTransaction
+{
+public:
+ typedef boost::shared_ptr<RecoverableTransaction> shared_ptr;
+ virtual void enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message) = 0;
+ virtual void dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message) = 0;
+ virtual ~RecoverableTransaction() {};
+};
+
+}}
+
+
+#endif
diff --git a/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
new file mode 100644
index 0000000000..4551bf8761
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RecoveredDequeue.h"
+
+using namespace qpid::broker;
+
+RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, Message::shared_ptr _msg) : queue(_queue), msg(_msg) {}
+
+bool RecoveredDequeue::prepare(TransactionContext*) throw(){
+ //should never be called; transaction has already prepared if an enqueue is recovered
+ return false;
+}
+
+void RecoveredDequeue::commit() throw(){
+}
+
+void RecoveredDequeue::rollback() throw(){
+ queue->process(msg);
+}
+
diff --git a/qpid/cpp/src/qpid/broker/RecoveredDequeue.h b/qpid/cpp/src/qpid/broker/RecoveredDequeue.h
new file mode 100644
index 0000000000..9e0c334dc3
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/RecoveredDequeue.h
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _RecoveredDequeue_
+#define _RecoveredDequeue_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "Deliverable.h"
+#include "BrokerMessage.h"
+#include "MessageStore.h"
+#include "BrokerQueue.h"
+#include "TxOp.h"
+
+namespace qpid {
+ namespace broker {
+ class RecoveredDequeue : public TxOp{
+ Queue::shared_ptr queue;
+ Message::shared_ptr msg;
+
+ public:
+ RecoveredDequeue(Queue::shared_ptr queue, Message::shared_ptr msg);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~RecoveredDequeue(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
new file mode 100644
index 0000000000..533af864b6
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RecoveredEnqueue.h"
+
+using namespace qpid::broker;
+
+RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, Message::shared_ptr _msg) : queue(_queue), msg(_msg) {}
+
+bool RecoveredEnqueue::prepare(TransactionContext*) throw(){
+ //should never be called; transaction has already prepared if an enqueue is recovered
+ return false;
+}
+
+void RecoveredEnqueue::commit() throw(){
+ queue->process(msg);
+}
+
+void RecoveredEnqueue::rollback() throw(){
+}
+
diff --git a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h
new file mode 100644
index 0000000000..25c5baf364
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _RecoveredEnqueue_
+#define _RecoveredEnqueue_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "Deliverable.h"
+#include "BrokerMessage.h"
+#include "MessageStore.h"
+#include "BrokerQueue.h"
+#include "TxOp.h"
+
+namespace qpid {
+ namespace broker {
+ class RecoveredEnqueue : public TxOp{
+ Queue::shared_ptr queue;
+ Message::shared_ptr msg;
+
+ public:
+ RecoveredEnqueue(Queue::shared_ptr queue, Message::shared_ptr msg);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~RecoveredEnqueue(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManager.h b/qpid/cpp/src/qpid/broker/RecoveryManager.h
index aae2bbe3ac..b6ccb74658 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManager.h
+++ b/qpid/cpp/src/qpid/broker/RecoveryManager.h
@@ -24,6 +24,8 @@
#include "RecoverableExchange.h"
#include "RecoverableQueue.h"
#include "RecoverableMessage.h"
+#include "RecoverableTransaction.h"
+#include "TransactionalStore.h"
#include "qpid/framing/Buffer.h"
namespace qpid {
@@ -35,6 +37,8 @@ namespace broker {
virtual RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer) = 0;
virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer) = 0;
virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0;
+ virtual RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
+ std::auto_ptr<TPCTransactionContext> txn) = 0;
virtual void recoveryComplete() = 0;
};
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 355c8de926..2daf3b2d0a 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -23,6 +23,8 @@
#include "BrokerMessage.h"
#include "BrokerMessageMessage.h"
#include "BrokerQueue.h"
+#include "RecoveredEnqueue.h"
+#include "RecoveredDequeue.h"
using namespace qpid;
using namespace qpid::broker;
@@ -32,8 +34,9 @@ using boost::dynamic_pointer_cast;
static const uint8_t BASIC = 1;
static const uint8_t MESSAGE = 2;
-RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, uint64_t _stagingThreshold)
- : queues(_queues), exchanges(_exchanges), stagingThreshold(_stagingThreshold) {}
+RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges,
+ DtxManager& _dtxMgr, uint64_t _stagingThreshold)
+ : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
RecoveryManagerImpl::~RecoveryManagerImpl() {}
@@ -49,6 +52,8 @@ public:
bool loadContent(uint64_t available);
void decodeContent(framing::Buffer& buffer);
void recover(Queue::shared_ptr queue);
+ void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
+ void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
};
class RecoverableQueueImpl : public RecoverableQueue
@@ -59,6 +64,8 @@ public:
~RecoverableQueueImpl() {};
void setPersistenceId(uint64_t id);
void recover(RecoverableMessage::shared_ptr msg);
+ void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
+ void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
};
class RecoverableExchangeImpl : public RecoverableExchange
@@ -71,6 +78,15 @@ public:
void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args);
};
+class RecoverableTransactionImpl : public RecoverableTransaction
+{
+ DtxBuffer::shared_ptr buffer;
+public:
+ RecoverableTransactionImpl(DtxBuffer::shared_ptr _buffer) : buffer(_buffer) {}
+ void enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message);
+ void dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message);
+};
+
RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer)
{
return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(Exchange::decode(exchanges, buffer), queues));
@@ -102,6 +118,14 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff
return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));
}
+RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
+ std::auto_ptr<TPCTransactionContext> txn)
+{
+ DtxBuffer::shared_ptr buffer(new DtxBuffer());
+ dtxMgr.recover(xid, txn, buffer);
+ return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer));
+}
+
void RecoveryManagerImpl::recoveryComplete()
{
//TODO (finalise binding setup etc)
@@ -162,3 +186,33 @@ void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::Fiel
Queue::shared_ptr queue = queues.find(queueName);
exchange->bind(queue, key, &args);
}
+
+void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
+{
+ buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg)));
+}
+
+void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
+{
+ buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg)));
+}
+
+void RecoverableQueueImpl::dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message)
+{
+ dynamic_pointer_cast<RecoverableMessageImpl>(message)->dequeue(buffer, queue);
+}
+
+void RecoverableQueueImpl::enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message)
+{
+ dynamic_pointer_cast<RecoverableMessageImpl>(message)->enqueue(buffer, queue);
+}
+
+void RecoverableTransactionImpl::dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message)
+{
+ dynamic_pointer_cast<RecoverableQueueImpl>(queue)->dequeue(buffer, message);
+}
+
+void RecoverableTransactionImpl::enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message)
+{
+ dynamic_pointer_cast<RecoverableQueueImpl>(queue)->enqueue(buffer, message);
+}
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
index 7802eee711..bcd71defb1 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
@@ -22,6 +22,7 @@
#define _RecoveryManagerImpl_
#include <list>
+#include "DtxManager.h"
#include "ExchangeRegistry.h"
#include "QueueRegistry.h"
#include "RecoveryManager.h"
@@ -32,14 +33,17 @@ namespace broker {
class RecoveryManagerImpl : public RecoveryManager{
QueueRegistry& queues;
ExchangeRegistry& exchanges;
+ DtxManager& dtxMgr;
const uint64_t stagingThreshold;
public:
- RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, uint64_t stagingThreshold);
+ RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, DtxManager& dtxMgr, uint64_t stagingThreshold);
~RecoveryManagerImpl();
RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer);
RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
+ RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
+ std::auto_ptr<TPCTransactionContext> txn);
void recoveryComplete();
static uint8_t decodeMessageType(framing::Buffer& buffer);