diff options
author | Gordon Sim <gsim@apache.org> | 2007-05-25 11:24:54 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-05-25 11:24:54 +0000 |
commit | 45f0ee18e3dacf9e8c746009eaef4e17b0a44bf8 (patch) | |
tree | 67a2ae89ca92c9b4fdc94e2f6a817439e648d069 /cpp/src/qpid/broker/RecoveryManagerImpl.cpp | |
parent | f646350b5e59ccf49f1253bd55f98d062769f2ee (diff) | |
download | qpid-python-45f0ee18e3dacf9e8c746009eaef4e17b0a44bf8.tar.gz |
Added support for recovering prepared transactions.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@541619 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 58 |
1 files changed, 56 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 355c8de926..2daf3b2d0a 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -23,6 +23,8 @@ #include "BrokerMessage.h" #include "BrokerMessageMessage.h" #include "BrokerQueue.h" +#include "RecoveredEnqueue.h" +#include "RecoveredDequeue.h" using namespace qpid; using namespace qpid::broker; @@ -32,8 +34,9 @@ using boost::dynamic_pointer_cast; static const uint8_t BASIC = 1; static const uint8_t MESSAGE = 2; -RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, uint64_t _stagingThreshold) - : queues(_queues), exchanges(_exchanges), stagingThreshold(_stagingThreshold) {} +RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, + DtxManager& _dtxMgr, uint64_t _stagingThreshold) + : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} @@ -49,6 +52,8 @@ public: bool loadContent(uint64_t available); void decodeContent(framing::Buffer& buffer); void recover(Queue::shared_ptr queue); + void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); + void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); }; class RecoverableQueueImpl : public RecoverableQueue @@ -59,6 +64,8 @@ public: ~RecoverableQueueImpl() {}; void setPersistenceId(uint64_t id); void recover(RecoverableMessage::shared_ptr msg); + void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); + void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); }; class RecoverableExchangeImpl : public RecoverableExchange @@ -71,6 +78,15 @@ public: void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args); }; +class RecoverableTransactionImpl : public RecoverableTransaction +{ + DtxBuffer::shared_ptr buffer; +public: + RecoverableTransactionImpl(DtxBuffer::shared_ptr _buffer) : buffer(_buffer) {} + void enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message); + void dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message); +}; + RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer) { return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(Exchange::decode(exchanges, buffer), queues)); @@ -102,6 +118,14 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold)); } +RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, + std::auto_ptr<TPCTransactionContext> txn) +{ + DtxBuffer::shared_ptr buffer(new DtxBuffer()); + dtxMgr.recover(xid, txn, buffer); + return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer)); +} + void RecoveryManagerImpl::recoveryComplete() { //TODO (finalise binding setup etc) @@ -162,3 +186,33 @@ void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::Fiel Queue::shared_ptr queue = queues.find(queueName); exchange->bind(queue, key, &args); } + +void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) +{ + buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg))); +} + +void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) +{ + buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg))); +} + +void RecoverableQueueImpl::dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message) +{ + dynamic_pointer_cast<RecoverableMessageImpl>(message)->dequeue(buffer, queue); +} + +void RecoverableQueueImpl::enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message) +{ + dynamic_pointer_cast<RecoverableMessageImpl>(message)->enqueue(buffer, queue); +} + +void RecoverableTransactionImpl::dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message) +{ + dynamic_pointer_cast<RecoverableQueueImpl>(queue)->dequeue(buffer, message); +} + +void RecoverableTransactionImpl::enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message) +{ + dynamic_pointer_cast<RecoverableQueueImpl>(queue)->enqueue(buffer, message); +} |