diff options
| author | Gordon Sim <gsim@apache.org> | 2012-10-19 17:15:46 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2012-10-19 17:15:46 +0000 |
| commit | 6dfe93d5e0f21127d8454d17c6b79a2ec0dc519d (patch) | |
| tree | e23d1354c11338358eaf69114bfa8bf869586f09 /cpp/src/qpid/broker/RecoveryManagerImpl.cpp | |
| parent | 331e98a65095a8360057f2c41629a94c2ac93707 (diff) | |
| download | qpid-python-6dfe93d5e0f21127d8454d17c6b79a2ec0dc519d.tar.gz | |
QPID-4368: Allow pluggable protocol implementations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1400177 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 34 |
1 files changed, 13 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index b04d7c34e0..6d831563e2 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -25,6 +25,8 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" +#include "qpid/broker/Protocol.h" +#include "qpid/broker/RecoverableMessageImpl.h" #include "qpid/broker/RecoveredEnqueue.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" @@ -38,26 +40,11 @@ namespace qpid { namespace broker { RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, - DtxManager& _dtxMgr) - : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr) {} + DtxManager& _dtxMgr, ProtocolRegistry& p) + : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), protocols(p) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} -class RecoverableMessageImpl : public RecoverableMessage -{ - Message msg; -public: - RecoverableMessageImpl(const Message& _msg); - ~RecoverableMessageImpl() {}; - void setPersistenceId(uint64_t id); - void setRedelivered(); - bool loadContent(uint64_t available); - void decodeContent(framing::Buffer& buffer); - void recover(Queue::shared_ptr queue); - void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); - void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); -}; - class RecoverableQueueImpl : public RecoverableQueue { Queue::shared_ptr queue; @@ -131,10 +118,15 @@ RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer) { - //TODO: determine encoding/version actually used - boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); - transfer->decodeHeader(buffer); - return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer))); + framing::Buffer sniffer(buffer.getPointer(), buffer.available()); + RecoverableMessage::shared_ptr m = protocols.recover(sniffer); + if (m) { + return m; + } else { + boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); + transfer->decodeHeader(buffer); + return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer))); + } } RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, |
