summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp42
1 files changed, 32 insertions, 10 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index c6af388d9d..8eb7e441a2 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -19,6 +19,7 @@
*
*/
+#include "Counter.h"
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "qpid/broker/Bridge.h"
@@ -44,19 +45,31 @@ namespace ha {
using namespace broker;
using namespace framing;
-const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
-const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event");
+const std::string QPID_HA_EVENT_PREFIX("qpid.ha-event:");
+const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
+const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position");
+const std::string QueueReplicator::READY_EVENT_KEY(QPID_HA_EVENT_PREFIX+"ready");
std::string QueueReplicator::replicatorName(const std::string& queueName) {
return QPID_REPLICATOR_ + queueName;
}
-QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
- : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
+bool QueueReplicator::isEventKey(const std::string key) {
+ const std::string& prefix = QPID_HA_EVENT_PREFIX;
+ bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
+ return ret;
+}
+
+QueueReplicator::QueueReplicator(const LogPrefix& lp,
+ boost::shared_ptr<Queue> q,
+ boost::shared_ptr<Link> l,
+ Counter* counter)
+ : Exchange(replicatorName(q->getName()), 0, q->getBroker()),
+ logPrefix(lp), queue(q), link(l),
+ unreadyCount(counter)
{
framing::Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
- logPrefix = "HA: Backup of " + queue->getName() + ": ";
QPID_LOG(info, logPrefix << "Created");
}
@@ -103,6 +116,8 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
+ if (unreadyCount) ++(*unreadyCount); // We are unready.
+
// FIXME aconway 2011-12-09: Failover optimization removed.
// There was code here to re-use messages already on the backup
// during fail-over. This optimization was removed to simplify
@@ -149,13 +164,18 @@ void QueueReplicator::route(Deliverable& msg)
try {
const std::string& key = msg.getMessage().getRoutingKey();
sys::Mutex::ScopedLock l(lock);
- if (key == DEQUEUE_EVENT_KEY) {
+ if (!isEventKey(key)) {
+ msg.deliverTo(queue);
+ QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+ }
+ else if (key == DEQUEUE_EVENT_KEY) {
SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
//TODO: should be able to optimise the following
for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
dequeue(*i, l);
- } else if (key == POSITION_EVENT_KEY) {
+ }
+ else if (key == POSITION_EVENT_KEY) {
SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
<< " to " << position);
@@ -165,10 +185,12 @@ void QueueReplicator::route(Deliverable& msg)
<< queue->getPosition() << " to " << position));
}
queue->setPosition(position);
- } else {
- msg.deliverTo(queue);
- QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
}
+ else if (key == READY_EVENT_KEY) {
+ QPID_LOG(info, logPrefix << "caught up at " << queue->getPosition());
+ if (unreadyCount) --(*unreadyCount); // We are now ready.
+ }
+ // Ignore unknown event keys, may be introduced in later versions.
}
catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "Replication failed: " << e.what());