diff options
| author | Gordon Sim <gsim@apache.org> | 2012-08-10 12:04:27 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2012-08-10 12:04:27 +0000 |
| commit | 20e2bf07e12352f7ec08b39a3972b9a0d797c2fb (patch) | |
| tree | 246e3c1007af941cb22842c7d4c12140d0a8f237 /cpp/src/qpid/broker/RecoveryManagerImpl.cpp | |
| parent | b114166ee2c302464fd03c3f49339e36b107e8b6 (diff) | |
| download | qpid-python-20e2bf07e12352f7ec08b39a3972b9a0d797c2fb.tar.gz | |
QPID-4178: broker refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1371676 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 28 |
1 files changed, 13 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 858535637a..7deeba5e65 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -21,11 +21,13 @@ #include "qpid/broker/RecoveryManagerImpl.h" #include "qpid/broker/Message.h" +#include "qpid/broker/PersistableMessage.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/RecoveredEnqueue.h" #include "qpid/broker/RecoveredDequeue.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/reply_exceptions.h" using boost::dynamic_pointer_cast; @@ -43,9 +45,9 @@ RecoveryManagerImpl::~RecoveryManagerImpl() {} class RecoverableMessageImpl : public RecoverableMessage { - intrusive_ptr<Message> msg; + Message msg; public: - RecoverableMessageImpl(const intrusive_ptr<Message>& _msg); + RecoverableMessageImpl(const Message& _msg); ~RecoverableMessageImpl() {}; void setPersistenceId(uint64_t id); void setRedelivered(); @@ -128,9 +130,10 @@ RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer) { - boost::intrusive_ptr<Message> message(new Message()); - message->decodeHeader(buffer); - return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message)); + //TODO: determine encoding/version actually used + boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); + transfer->decodeHeader(buffer); + return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer))); } RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, @@ -163,12 +166,7 @@ void RecoveryManagerImpl::recoveryComplete() exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges))); } -RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg) : msg(_msg) -{ - if (!msg->isPersistent()) { - msg->forcePersistent(); // set so that message will get dequeued from store. - } -} +RecoverableMessageImpl:: RecoverableMessageImpl(const Message& _msg) : msg(_msg) {} bool RecoverableMessageImpl::loadContent(uint64_t /*available*/) { @@ -177,7 +175,7 @@ bool RecoverableMessageImpl::loadContent(uint64_t /*available*/) void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer) { - msg->decodeContent(buffer); + msg.getPersistentContext()->decodeContent(buffer); } void RecoverableMessageImpl::recover(Queue::shared_ptr queue) @@ -187,12 +185,12 @@ void RecoverableMessageImpl::recover(Queue::shared_ptr queue) void RecoverableMessageImpl::setPersistenceId(uint64_t id) { - msg->setPersistenceId(id); + msg.getPersistentContext()->setPersistenceId(id); } void RecoverableMessageImpl::setRedelivered() { - msg->redeliver(); + msg.deliver();//increment delivery count (but at present that isn't recorded durably) } void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg) @@ -204,7 +202,7 @@ void RecoverableQueueImpl::setPersistenceId(uint64_t id) { queue->setPersistenceId(id); } - + uint64_t RecoverableQueueImpl::getPersistenceId() const { return queue->getPersistenceId(); |
