summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp55
1 files changed, 32 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 0c725ebce0..c6af388d9d 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -30,6 +30,7 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
+#include "qpid/Msg.h"
#include <boost/shared_ptr.hpp>
namespace {
@@ -55,8 +56,8 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L
{
framing::Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
- logPrefix = "HA: Backup " + queue->getName() + ": ";
- QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
+ logPrefix = "HA: Backup of " + queue->getName() + ": ";
+ QPID_LOG(info, logPrefix << "Created");
}
// This must be separate from the constructor so we can call shared_from_this.
@@ -78,7 +79,7 @@ void QueueReplicator::activate() {
0, // sync?
// Include shared_ptr to self to ensure we are not deleted
// before initializeBridge is called.
- boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this())
+ boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2)
);
bridge = result.first;
}
@@ -96,9 +97,7 @@ void QueueReplicator::deactivate() {
}
// Called in a broker connection thread when the bridge is created.
-// shared_ptr to self ensures we are not deleted before initializeBridge is called.
-void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler,
- boost::shared_ptr<QueueReplicator> /*self*/) {
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
sys::Mutex::ScopedLock l(lock);
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -147,23 +146,33 @@ void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&)
// Called in connection thread of the queues bridge to primary.
void QueueReplicator::route(Deliverable& msg)
{
- const std::string& key = msg.getMessage().getRoutingKey();
- sys::Mutex::ScopedLock l(lock);
- if (key == DEQUEUE_EVENT_KEY) {
- SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
- //TODO: should be able to optimise the following
- for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
- dequeue(*i, l);
- } else if (key == POSITION_EVENT_KEY) {
- SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
- << " to " << position);
- assert(queue->getPosition() <= position);
- queue->setPosition(position);
- } else {
- msg.deliverTo(queue);
- QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+ try {
+ const std::string& key = msg.getMessage().getRoutingKey();
+ sys::Mutex::ScopedLock l(lock);
+ if (key == DEQUEUE_EVENT_KEY) {
+ SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
+ QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
+ //TODO: should be able to optimise the following
+ for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
+ dequeue(*i, l);
+ } else if (key == POSITION_EVENT_KEY) {
+ SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
+ QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
+ << " to " << position);
+ if (queue->getPosition() > position) {
+ throw Exception(
+ QPID_MSG(logPrefix << "Invalid position update from "
+ << queue->getPosition() << " to " << position));
+ }
+ queue->setPosition(position);
+ } else {
+ msg.deliverTo(queue);
+ QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+ }
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(critical, logPrefix << "Replication failed: " << e.what());
+ throw;
}
}