diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 28 |
1 files changed, 13 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 858535637a..7deeba5e65 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -21,11 +21,13 @@ #include "qpid/broker/RecoveryManagerImpl.h" #include "qpid/broker/Message.h" +#include "qpid/broker/PersistableMessage.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/RecoveredEnqueue.h" #include "qpid/broker/RecoveredDequeue.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/reply_exceptions.h" using boost::dynamic_pointer_cast; @@ -43,9 +45,9 @@ RecoveryManagerImpl::~RecoveryManagerImpl() {} class RecoverableMessageImpl : public RecoverableMessage { - intrusive_ptr<Message> msg; + Message msg; public: - RecoverableMessageImpl(const intrusive_ptr<Message>& _msg); + RecoverableMessageImpl(const Message& _msg); ~RecoverableMessageImpl() {}; void setPersistenceId(uint64_t id); void setRedelivered(); @@ -128,9 +130,10 @@ RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer) { - boost::intrusive_ptr<Message> message(new Message()); - message->decodeHeader(buffer); - return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message)); + //TODO: determine encoding/version actually used + boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); + transfer->decodeHeader(buffer); + return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer))); } RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, @@ -163,12 +166,7 @@ void RecoveryManagerImpl::recoveryComplete() exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges))); } -RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg) : msg(_msg) -{ - if (!msg->isPersistent()) { - msg->forcePersistent(); // set so that message will get dequeued from store. - } -} +RecoverableMessageImpl:: RecoverableMessageImpl(const Message& _msg) : msg(_msg) {} bool RecoverableMessageImpl::loadContent(uint64_t /*available*/) { @@ -177,7 +175,7 @@ bool RecoverableMessageImpl::loadContent(uint64_t /*available*/) void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer) { - msg->decodeContent(buffer); + msg.getPersistentContext()->decodeContent(buffer); } void RecoverableMessageImpl::recover(Queue::shared_ptr queue) @@ -187,12 +185,12 @@ void RecoverableMessageImpl::recover(Queue::shared_ptr queue) void RecoverableMessageImpl::setPersistenceId(uint64_t id) { - msg->setPersistenceId(id); + msg.getPersistentContext()->setPersistenceId(id); } void RecoverableMessageImpl::setRedelivered() { - msg->redeliver(); + msg.deliver();//increment delivery count (but at present that isn't recorded durably) } void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg) @@ -204,7 +202,7 @@ void RecoverableQueueImpl::setPersistenceId(uint64_t id) { queue->setPersistenceId(id); } - + uint64_t RecoverableQueueImpl::getPersistenceId() const { return queue->getPersistenceId(); |