diff options
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 42 |
1 files changed, 32 insertions, 10 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index c6af388d9d..8eb7e441a2 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -19,6 +19,7 @@ * */ +#include "Counter.h" #include "QueueReplicator.h" #include "ReplicatingSubscription.h" #include "qpid/broker/Bridge.h" @@ -44,19 +45,31 @@ namespace ha { using namespace broker; using namespace framing; -const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event"); -const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event"); +const std::string QPID_HA_EVENT_PREFIX("qpid.ha-event:"); +const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue"); +const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position"); +const std::string QueueReplicator::READY_EVENT_KEY(QPID_HA_EVENT_PREFIX+"ready"); std::string QueueReplicator::replicatorName(const std::string& queueName) { return QPID_REPLICATOR_ + queueName; } -QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) - : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) +bool QueueReplicator::isEventKey(const std::string key) { + const std::string& prefix = QPID_HA_EVENT_PREFIX; + bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0; + return ret; +} + +QueueReplicator::QueueReplicator(const LogPrefix& lp, + boost::shared_ptr<Queue> q, + boost::shared_ptr<Link> l, + Counter* counter) + : Exchange(replicatorName(q->getName()), 0, q->getBroker()), + logPrefix(lp), queue(q), link(l), + unreadyCount(counter) { framing::Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); - logPrefix = "HA: Backup of " + queue->getName() + ": "; QPID_LOG(info, logPrefix << "Created"); } @@ -103,6 +116,8 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; + if (unreadyCount) ++(*unreadyCount); // We are unready. + // FIXME aconway 2011-12-09: Failover optimization removed. // There was code here to re-use messages already on the backup // during fail-over. This optimization was removed to simplify @@ -149,13 +164,18 @@ void QueueReplicator::route(Deliverable& msg) try { const std::string& key = msg.getMessage().getRoutingKey(); sys::Mutex::ScopedLock l(lock); - if (key == DEQUEUE_EVENT_KEY) { + if (!isEventKey(key)) { + msg.deliverTo(queue); + QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); + } + else if (key == DEQUEUE_EVENT_KEY) { SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); //TODO: should be able to optimise the following for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) dequeue(*i, l); - } else if (key == POSITION_EVENT_KEY) { + } + else if (key == POSITION_EVENT_KEY) { SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() << " to " << position); @@ -165,10 +185,12 @@ void QueueReplicator::route(Deliverable& msg) << queue->getPosition() << " to " << position)); } queue->setPosition(position); - } else { - msg.deliverTo(queue); - QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); } + else if (key == READY_EVENT_KEY) { + QPID_LOG(info, logPrefix << "caught up at " << queue->getPosition()); + if (unreadyCount) --(*unreadyCount); // We are now ready. + } + // Ignore unknown event keys, may be introduced in later versions. } catch (const std::exception& e) { QPID_LOG(critical, logPrefix << "Replication failed: " << e.what()); |
