summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-05-25 11:24:54 +0000
committerGordon Sim <gsim@apache.org>2007-05-25 11:24:54 +0000
commit45f0ee18e3dacf9e8c746009eaef4e17b0a44bf8 (patch)
tree67a2ae89ca92c9b4fdc94e2f6a817439e648d069 /cpp/src/qpid/broker/RecoveryManagerImpl.cpp
parentf646350b5e59ccf49f1253bd55f98d062769f2ee (diff)
downloadqpid-python-45f0ee18e3dacf9e8c746009eaef4e17b0a44bf8.tar.gz
Added support for recovering prepared transactions.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@541619 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp58
1 files changed, 56 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 355c8de926..2daf3b2d0a 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -23,6 +23,8 @@
#include "BrokerMessage.h"
#include "BrokerMessageMessage.h"
#include "BrokerQueue.h"
+#include "RecoveredEnqueue.h"
+#include "RecoveredDequeue.h"
using namespace qpid;
using namespace qpid::broker;
@@ -32,8 +34,9 @@ using boost::dynamic_pointer_cast;
static const uint8_t BASIC = 1;
static const uint8_t MESSAGE = 2;
-RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, uint64_t _stagingThreshold)
- : queues(_queues), exchanges(_exchanges), stagingThreshold(_stagingThreshold) {}
+RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges,
+ DtxManager& _dtxMgr, uint64_t _stagingThreshold)
+ : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
RecoveryManagerImpl::~RecoveryManagerImpl() {}
@@ -49,6 +52,8 @@ public:
bool loadContent(uint64_t available);
void decodeContent(framing::Buffer& buffer);
void recover(Queue::shared_ptr queue);
+ void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
+ void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
};
class RecoverableQueueImpl : public RecoverableQueue
@@ -59,6 +64,8 @@ public:
~RecoverableQueueImpl() {};
void setPersistenceId(uint64_t id);
void recover(RecoverableMessage::shared_ptr msg);
+ void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
+ void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
};
class RecoverableExchangeImpl : public RecoverableExchange
@@ -71,6 +78,15 @@ public:
void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args);
};
+class RecoverableTransactionImpl : public RecoverableTransaction
+{
+ DtxBuffer::shared_ptr buffer;
+public:
+ RecoverableTransactionImpl(DtxBuffer::shared_ptr _buffer) : buffer(_buffer) {}
+ void enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message);
+ void dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message);
+};
+
RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer)
{
return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(Exchange::decode(exchanges, buffer), queues));
@@ -102,6 +118,14 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff
return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));
}
+RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
+ std::auto_ptr<TPCTransactionContext> txn)
+{
+ DtxBuffer::shared_ptr buffer(new DtxBuffer());
+ dtxMgr.recover(xid, txn, buffer);
+ return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer));
+}
+
void RecoveryManagerImpl::recoveryComplete()
{
//TODO (finalise binding setup etc)
@@ -162,3 +186,33 @@ void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::Fiel
Queue::shared_ptr queue = queues.find(queueName);
exchange->bind(queue, key, &args);
}
+
+void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
+{
+ buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg)));
+}
+
+void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
+{
+ buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg)));
+}
+
+void RecoverableQueueImpl::dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message)
+{
+ dynamic_pointer_cast<RecoverableMessageImpl>(message)->dequeue(buffer, queue);
+}
+
+void RecoverableQueueImpl::enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message)
+{
+ dynamic_pointer_cast<RecoverableMessageImpl>(message)->enqueue(buffer, queue);
+}
+
+void RecoverableTransactionImpl::dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message)
+{
+ dynamic_pointer_cast<RecoverableQueueImpl>(queue)->dequeue(buffer, message);
+}
+
+void RecoverableTransactionImpl::enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message)
+{
+ dynamic_pointer_cast<RecoverableQueueImpl>(queue)->enqueue(buffer, message);
+}