summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp28
1 files changed, 13 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 858535637a..7deeba5e65 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -21,11 +21,13 @@
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/Message.h"
+#include "qpid/broker/PersistableMessage.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/reply_exceptions.h"
using boost::dynamic_pointer_cast;
@@ -43,9 +45,9 @@ RecoveryManagerImpl::~RecoveryManagerImpl() {}
class RecoverableMessageImpl : public RecoverableMessage
{
- intrusive_ptr<Message> msg;
+ Message msg;
public:
- RecoverableMessageImpl(const intrusive_ptr<Message>& _msg);
+ RecoverableMessageImpl(const Message& _msg);
~RecoverableMessageImpl() {};
void setPersistenceId(uint64_t id);
void setRedelivered();
@@ -128,9 +130,10 @@ RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer&
RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
{
- boost::intrusive_ptr<Message> message(new Message());
- message->decodeHeader(buffer);
- return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message));
+ //TODO: determine encoding/version actually used
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
+ transfer->decodeHeader(buffer);
+ return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer)));
}
RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
@@ -163,12 +166,7 @@ void RecoveryManagerImpl::recoveryComplete()
exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges)));
}
-RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg) : msg(_msg)
-{
- if (!msg->isPersistent()) {
- msg->forcePersistent(); // set so that message will get dequeued from store.
- }
-}
+RecoverableMessageImpl:: RecoverableMessageImpl(const Message& _msg) : msg(_msg) {}
bool RecoverableMessageImpl::loadContent(uint64_t /*available*/)
{
@@ -177,7 +175,7 @@ bool RecoverableMessageImpl::loadContent(uint64_t /*available*/)
void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer)
{
- msg->decodeContent(buffer);
+ msg.getPersistentContext()->decodeContent(buffer);
}
void RecoverableMessageImpl::recover(Queue::shared_ptr queue)
@@ -187,12 +185,12 @@ void RecoverableMessageImpl::recover(Queue::shared_ptr queue)
void RecoverableMessageImpl::setPersistenceId(uint64_t id)
{
- msg->setPersistenceId(id);
+ msg.getPersistentContext()->setPersistenceId(id);
}
void RecoverableMessageImpl::setRedelivered()
{
- msg->redeliver();
+ msg.deliver();//increment delivery count (but at present that isn't recorded durably)
}
void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
@@ -204,7 +202,7 @@ void RecoverableQueueImpl::setPersistenceId(uint64_t id)
{
queue->setPersistenceId(id);
}
-
+
uint64_t RecoverableQueueImpl::getPersistenceId() const
{
return queue->getPersistenceId();