diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 55 |
1 files changed, 32 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 0c725ebce0..c6af388d9d 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -30,6 +30,7 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" +#include "qpid/Msg.h" #include <boost/shared_ptr.hpp> namespace { @@ -55,8 +56,8 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L { framing::Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); - logPrefix = "HA: Backup " + queue->getName() + ": "; - QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings()); + logPrefix = "HA: Backup of " + queue->getName() + ": "; + QPID_LOG(info, logPrefix << "Created"); } // This must be separate from the constructor so we can call shared_from_this. @@ -78,7 +79,7 @@ void QueueReplicator::activate() { 0, // sync? // Include shared_ptr to self to ensure we are not deleted // before initializeBridge is called. - boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this()) + boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2) ); bridge = result.first; } @@ -96,9 +97,7 @@ void QueueReplicator::deactivate() { } // Called in a broker connection thread when the bridge is created. -// shared_ptr to self ensures we are not deleted before initializeBridge is called. -void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler, - boost::shared_ptr<QueueReplicator> /*self*/) { +void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { sys::Mutex::ScopedLock l(lock); framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); @@ -147,23 +146,33 @@ void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) // Called in connection thread of the queues bridge to primary. void QueueReplicator::route(Deliverable& msg) { - const std::string& key = msg.getMessage().getRoutingKey(); - sys::Mutex::ScopedLock l(lock); - if (key == DEQUEUE_EVENT_KEY) { - SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); - QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); - //TODO: should be able to optimise the following - for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) - dequeue(*i, l); - } else if (key == POSITION_EVENT_KEY) { - SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); - QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() - << " to " << position); - assert(queue->getPosition() <= position); - queue->setPosition(position); - } else { - msg.deliverTo(queue); - QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); + try { + const std::string& key = msg.getMessage().getRoutingKey(); + sys::Mutex::ScopedLock l(lock); + if (key == DEQUEUE_EVENT_KEY) { + SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); + QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); + //TODO: should be able to optimise the following + for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) + dequeue(*i, l); + } else if (key == POSITION_EVENT_KEY) { + SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); + QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() + << " to " << position); + if (queue->getPosition() > position) { + throw Exception( + QPID_MSG(logPrefix << "Invalid position update from " + << queue->getPosition() << " to " << position)); + } + queue->setPosition(position); + } else { + msg.deliverTo(queue); + QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); + } + } + catch (const std::exception& e) { + QPID_LOG(critical, logPrefix << "Replication failed: " << e.what()); + throw; } } |