diff options
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 62 |
1 files changed, 33 insertions, 29 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 0017cc82cd..633619be13 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -30,8 +30,8 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" +#include "qpid/Msg.h" #include <boost/shared_ptr.hpp> -#include <sstream> namespace { const std::string QPID_REPLICATOR_("qpid.replicator-"); @@ -54,10 +54,8 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) { QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) { - std::stringstream ss; - ss << "HA: Backup " << queue->getName() << ": "; - logPrefix = ss.str(); - QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings()); + logPrefix = "HA: Backup of " + queue->getName() + ": "; + QPID_LOG(info, logPrefix << "Created"); } // This must be separate from the constructor so we can call shared_from_this. @@ -77,7 +75,7 @@ void QueueReplicator::activate() { 0, // sync? // Include shared_ptr to self to ensure we are not deleted // before initializeBridge is called. - boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this()) + boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2) ); } @@ -91,9 +89,7 @@ void QueueReplicator::deactivate() { } // Called in a broker connection thread when the bridge is created. -// shared_ptr to self ensures we are not deleted before initializeBridge is called. -void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler, - boost::shared_ptr<QueueReplicator> /*self*/) { +void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { sys::Mutex::ScopedLock l(lock); bridgeName = bridge.getName(); framing::AMQP_ServerProxy peer(sessionHandler.out); @@ -141,27 +137,35 @@ void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) } // Called in connection thread of the queues bridge to primary. -void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*) +void QueueReplicator::route(Deliverable& msg) { - sys::Mutex::ScopedLock l(lock); - if (key == DEQUEUE_EVENT_KEY) { - SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); - QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); - //TODO: should be able to optimise the following - for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) - dequeue(*i, l); - } else if (key == POSITION_EVENT_KEY) { - SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); - QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() - << " to " << position); - assert(queue->getPosition() <= position); - //TODO aconway 2011-12-14: Optimize this? - for (SequenceNumber i = queue->getPosition(); i < position; ++i) - dequeue(i,l); - queue->setPosition(position); - } else { - msg.deliverTo(queue); - QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); + try { + const std::string& key = msg.getMessage().getRoutingKey(); + sys::Mutex::ScopedLock l(lock); + if (key == DEQUEUE_EVENT_KEY) { + SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); + QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); + //TODO: should be able to optimise the following + for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) + dequeue(*i, l); + } else if (key == POSITION_EVENT_KEY) { + SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); + QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() + << " to " << position); + if (queue->getPosition() > position) { + throw Exception( + QPID_MSG(logPrefix << "Invalid position update from " + << queue->getPosition() << " to " << position)); + } + queue->setPosition(position); + } else { + msg.deliverTo(queue); + QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); + } + } + catch (const std::exception& e) { + QPID_LOG(critical, logPrefix << "Replication failed: " << e.what()); + throw; } } |