summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-12-13 19:30:12 +0000
committerAlan Conway <aconway@apache.org>2013-12-13 19:30:12 +0000
commit5c64daa10dd416d221ef56b211341d4e45da2500 (patch)
tree7f7b6fd825ca228c5b324d356ccc311e893501cb
parent4daedb1549d71f2c472ad77cc6f531423711819a (diff)
downloadqpid-python-5c64daa10dd416d221ef56b211341d4e45da2500.tar.gz
QPID-5421: HA replication error in stand-alone replication
There were replication errors because with stand-alone replication an IdSetter was not set on the original queue until queue replication was set up. Any messages on the queue *before* replication was setup had 0 replication IDs. When one of those messages was dequeued on the source queue, an incorrect message was dequeued on the replica queue. The fix is to add an IdSetter to every queue when replication is enabled. The unit test ha_tests.ReplicationTests.test_standalone_queue_replica has been updated to test for this issue. This commit also has some general tidy-up work around IdSetter and QueueSnapshot. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1550819 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Observers.h12
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp23
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h7
-rw-r--r--qpid/cpp/src/qpid/ha/IdSetter.h6
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp12
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h1
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp40
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueSnapshot.h2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueSnapshots.h70
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp17
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py53
13 files changed, 89 insertions, 158 deletions
diff --git a/qpid/cpp/src/qpid/broker/Observers.h b/qpid/cpp/src/qpid/broker/Observers.h
index b7b26a0d38..5357938c77 100644
--- a/qpid/cpp/src/qpid/broker/Observers.h
+++ b/qpid/cpp/src/qpid/broker/Observers.h
@@ -59,6 +59,14 @@ class Observers
std::for_each(copy.begin(), copy.end(), f);
}
+ template <class T> boost::shared_ptr<T> findType() const {
+ sys::Mutex::ScopedLock l(lock);
+ typename Set::const_iterator i =
+ std::find_if(observers.begin(), observers.end(), &isA<T>);
+ return i == observers.end() ?
+ boost::shared_ptr<T>() : boost::dynamic_pointer_cast<T>(*i);
+ }
+
protected:
typedef std::set<ObserverPtr> Set;
Observers() : lock(myLock) {}
@@ -71,6 +79,10 @@ class Observers
std::for_each(observers.begin(), observers.end(), f);
}
+ template <class T> static bool isA(const ObserverPtr&o) {
+ return boost::dynamic_pointer_cast<T>(o);
+ }
+
mutable sys::Mutex myLock;
mutable sys::Mutex& lock;
Set observers;
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 1587b5b33f..0737701431 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -892,7 +892,7 @@ void BrokerReplicator::disconnected() {
// Make copy of exchanges so we can work outside the registry lock.
ExchangeVector exs;
- exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1));
+ exchanges.eachExchange(boost::bind(&ExchangeVector::push_back, boost::ref(exs), _1));
for_each(exs.begin(), exs.end(),
boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1));
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 61561b3af6..50e99ef527 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -22,17 +22,18 @@
#include "BackupConnectionExcluder.h"
#include "ConnectionObserver.h"
#include "HaBroker.h"
+#include "IdSetter.h"
#include "Primary.h"
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "StandAlone.h"
#include "QueueSnapshot.h"
-#include "QueueSnapshots.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/assert.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SignalHandler.h"
@@ -60,6 +61,20 @@ using sys::Mutex;
using boost::shared_ptr;
using boost::dynamic_pointer_cast;
+// In a HaBroker we always need to add QueueSnapshot and IdSetter to each queue
+// because we don't know in advance which queues might be used for stand-alone
+// replication.
+//
+// TODO aconway 2013-12-13: Can we restrict this to queues identified as replicated?
+//
+class HaBroker::BrokerObserver : public broker::BrokerObserver {
+ public:
+ void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
+ q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
+ q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter));
+ }
+};
+
// Called in Plugin::earlyInitialize
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: systemId(b.getSystem()->getSystemId().data()),
@@ -69,8 +84,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
observer(new ConnectionObserver(*this, systemId)),
role(new StandAlone),
membership(BrokerInfo(systemId, STANDALONE), *this),
- failoverExchange(new FailoverExchange(*b.GetVhostObject(), b)),
- queueSnapshots(shared_ptr<QueueSnapshots>(new QueueSnapshots))
+ failoverExchange(new FailoverExchange(*b.GetVhostObject(), b))
{
// If we are joining a cluster we must start excluding clients now,
// otherwise there's a window for a client to connect before we get to
@@ -82,8 +96,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
broker.getConnectionObservers().add(observer);
broker.getExchanges().registerExchange(failoverExchange);
}
- // QueueSnapshots are needed for standalone replication as well as cluster.
- broker.getBrokerObservers().add(queueSnapshots);
+ broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver()));
}
namespace {
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index d10014846c..9fadd4f35c 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -54,8 +54,6 @@ class Backup;
class ConnectionObserver;
class Primary;
class Role;
-class QueueSnapshot;
-class QueueSnapshots;
class QueueReplicator;
/**
@@ -98,14 +96,14 @@ class HaBroker : public management::Manageable
void setAddress(const Address&); // set self address from a self-connection
- boost::shared_ptr<QueueSnapshots> getQueueSnapshots() { return queueSnapshots; }
-
boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName);
/** Authenticated user ID for queue create/delete */
std::string getUserId() const { return userId; }
private:
+ class BrokerObserver;
+
void setPublicUrl(const Url&);
void setBrokerUrl(const Url&);
void updateClientUrl(sys::Mutex::ScopedLock&);
@@ -129,7 +127,6 @@ class HaBroker : public management::Manageable
boost::shared_ptr<Role> role;
Membership membership;
boost::shared_ptr<FailoverExchange> failoverExchange;
- boost::shared_ptr<QueueSnapshots> queueSnapshots;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/IdSetter.h b/qpid/cpp/src/qpid/ha/IdSetter.h
index dc9fa90af9..67da62ef48 100644
--- a/qpid/cpp/src/qpid/ha/IdSetter.h
+++ b/qpid/cpp/src/qpid/ha/IdSetter.h
@@ -43,15 +43,11 @@ namespace ha {
class IdSetter : public broker::MessageInterceptor
{
public:
- IdSetter(const std::string& q, ReplicationId firstId) : nextId(firstId), name(q) {
- QPID_LOG(trace, "Initial replication ID for " << name << " =" << nextId.get());
- }
-
+ IdSetter(ReplicationId firstId=1) : nextId(firstId) {}
void record(broker::Message& m) { m.setReplicationId(nextId++); }
private:
sys::AtomicValue<uint32_t> nextId;
- std::string name;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 0c0fe983bb..b437190004 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -104,8 +104,6 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
QueueReplicator::copy(hb.getBroker().getExchanges(), qrs);
std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1));
- broker::QueueRegistry& queues = hb.getBroker().getQueues();
- queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1));
if (expect.empty()) {
QPID_LOG(notice, logPrefix << "Promoted to primary. No expected backups.");
}
@@ -140,15 +138,6 @@ Primary::~Primary() {
haBroker.getObserver()->reset();
}
-void Primary::initializeQueue(boost::shared_ptr<broker::Queue> q) {
- if (replicationTest.useLevel(*q) == ALL) {
- boost::shared_ptr<QueueReplicator> qr = haBroker.findQueueReplicator(q->getName());
- ReplicationId firstId = qr ? qr->getMaxId()+1 : ReplicationId(1);
- q->getMessageInterceptors().add(
- boost::shared_ptr<IdSetter>(new IdSetter(q->getName(), firstId)));
- }
-}
-
void Primary::checkReady() {
bool activate = false;
{
@@ -261,7 +250,6 @@ void Primary::queueCreate(const QueuePtr& q) {
if (level) {
QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
<< " replication: " << printable(level));
- initializeQueue(q);
// Give each queue a unique id. Used by backups to avoid confusion of
// same-named queues.
q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 7f98f06fec..e0a7065e2c 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -125,7 +125,6 @@ class Primary : public Role
RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
- void initializeQueue(boost::shared_ptr<broker::Queue>);
void checkReady();
void checkReady(RemoteBackupPtr);
void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index b43658365c..eda3f96180 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -21,8 +21,9 @@
#include "Event.h"
#include "HaBroker.h"
+#include "IdSetter.h"
#include "QueueReplicator.h"
-#include "QueueSnapshots.h"
+#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "types.h"
@@ -122,6 +123,11 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
settings(hb.getSettings()),
nextId(0), maxId(0)
{
+ // The QueueReplicator will take over setting replication IDs.
+ boost::shared_ptr<IdSetter> setter =
+ q->getMessageInterceptors().findType<IdSetter>();
+ if (setter) q->getMessageInterceptors().remove(setter);
+
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -212,8 +218,9 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType());
arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
- arguments.setString(ReplicatingSubscription::QPID_ID_SET,
- encodeStr(haBroker.getQueueSnapshots()->get(queue)->snapshot()));
+ boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>();
+ if (qs) arguments.setString(ReplicatingSubscription::QPID_ID_SET, encodeStr(qs->getSnapshot()));
+
try {
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
@@ -254,6 +261,7 @@ void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) {
}
// Called in connection thread of the queues bridge to primary.
+
void QueueReplicator::route(Deliverable& deliverable)
{
try {
@@ -293,11 +301,6 @@ void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
nextId = decodeStr<IdEvent>(data).id;
}
-ReplicationId QueueReplicator::getMaxId() {
- Mutex::ScopedLock l(lock);
- return maxId;
-}
-
void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
// If the queue is destroyed at the same time we are subscribing, we may
@@ -320,14 +323,19 @@ bool QueueReplicator::hasBindings() { return false; }
std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
void QueueReplicator::promoted() {
- // Promoted to primary, deal with auto-delete now.
- if (queue && queue->isAutoDelete() && subscribed) {
- // Make a temporary shared_ptr to prevent premature deletion of queue.
- // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
- // which could delete the queue while it's still running it's destroyed logic.
- boost::shared_ptr<Queue> q(queue);
- q->releaseFromUse();
- q->scheduleAutoDelete();
+ if (queue) {
+ // On primary QueueReplicator no longer sets IDs, start an IdSetter.
+ queue->getMessageInterceptors().add(
+ boost::shared_ptr<IdSetter>(new IdSetter(maxId+1)));
+ // Process auto-deletes
+ if (queue->isAutoDelete() && subscribed) {
+ // Make a temporary shared_ptr to prevent premature deletion of queue.
+ // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
+ // which could delete the queue while it's still running it's destroyed logic.
+ boost::shared_ptr<Queue> q(queue);
+ q->releaseFromUse();
+ q->scheduleAutoDelete();
+ }
}
}
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 8938285fe3..a86355f194 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -85,8 +85,6 @@ class QueueReplicator : public broker::Exchange,
boost::shared_ptr<broker::Queue> getQueue() const { return queue; }
- ReplicationId getMaxId();
-
// No-op unused Exchange virtual functions.
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
diff --git a/qpid/cpp/src/qpid/ha/QueueSnapshot.h b/qpid/cpp/src/qpid/ha/QueueSnapshot.h
index 5b1054d934..577bd96ef7 100644
--- a/qpid/cpp/src/qpid/ha/QueueSnapshot.h
+++ b/qpid/cpp/src/qpid/ha/QueueSnapshot.h
@@ -53,7 +53,7 @@ class QueueSnapshot : public broker::QueueObserver
void requeued(const broker::Message&) {}
- ReplicationIdSet snapshot() {
+ ReplicationIdSet getSnapshot() {
sys::Mutex::ScopedLock l(lock);
return set;
}
diff --git a/qpid/cpp/src/qpid/ha/QueueSnapshots.h b/qpid/cpp/src/qpid/ha/QueueSnapshots.h
deleted file mode 100644
index 6612c71f6a..0000000000
--- a/qpid/cpp/src/qpid/ha/QueueSnapshots.h
+++ /dev/null
@@ -1,70 +0,0 @@
-#ifndef QPID_HA_QUEUESNAPSHOTS_H
-#define QPID_HA_QUEUESNAPSHOTS_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-#include "QueueSnapshot.h"
-#include "hash.h"
-
-#include "qpid/assert.h"
-#include "qpid/broker/BrokerObserver.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/sys/Mutex.h"
-
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-namespace ha {
-
-/**
- * BrokerObserver that maintains a map of the QueueSnapshot for each queue.
- * THREAD SAFE.
- */
-class QueueSnapshots : public broker::BrokerObserver
-{
- public:
- boost::shared_ptr<QueueSnapshot> get(const boost::shared_ptr<broker::Queue>& q) const {
- boost::shared_ptr<QueueSnapshot> qs;
- q->getObservers().each(
- boost::bind(QueueSnapshots::saveQueueSnapshot, _1, boost::ref(qs)));
- return qs;
- }
-
- // BrokerObserver overrides.
- void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
- q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
- }
-
- private:
- static void saveQueueSnapshot(
- const boost::shared_ptr<broker::QueueObserver>& observer,
- boost::shared_ptr<QueueSnapshot>& out)
- {
- if (!out) out = boost::dynamic_pointer_cast<QueueSnapshot>(observer);
- }
-};
-
-
-}} // namespace qpid::ha
-
-#endif /*!QPID_HA_QUEUESNAPSHOTS_H*/
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 2db7845067..635d5047bd 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -22,7 +22,7 @@
#include "Event.h"
#include "IdSetter.h"
#include "QueueGuard.h"
-#include "QueueSnapshots.h"
+#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
#include "TxReplicatingSubscription.h"
#include "Primary.h"
@@ -129,17 +129,6 @@ void ReplicatingSubscription::initialize() {
info.printId(os) << ": ";
logPrefix = os.str();
- // If this is a non-cluster standalone replication then we need to
- // set up an IdSetter if there is not already one.
- boost::shared_ptr<IdSetter> idSetter;
- queue->getMessageInterceptors().each(
- boost::bind(&copyIf, _1, boost::ref(idSetter)));
- if (!idSetter) {
- QPID_LOG(debug, logPrefix << "Standalone replication");
- queue->getMessageInterceptors().add(
- boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), 1)));
- }
-
// If there's already a guard (we are in failover) use it, else create one.
if (primary) guard = primary->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
@@ -152,14 +141,14 @@ void ReplicatingSubscription::initialize() {
// between the snapshot and attaching the observer.
queue->getObservers().add(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
- boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue);
+ boost::shared_ptr<QueueSnapshot> snapshot = queue->getObservers().findType<QueueSnapshot>();
// There may be no snapshot if the queue is being deleted concurrently.
if (!snapshot) {
queue->getObservers().remove(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
}
- ReplicationIdSet primaryIds = snapshot->snapshot();
+ ReplicationIdSet primaryIds = snapshot->getSnapshot();
std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
ReplicationIdSet backupIds;
if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 1a5d6ddff8..7db24810bf 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -272,33 +272,34 @@ class ReplicationTests(HaBrokerTest):
def test_standalone_queue_replica(self):
"""Test replication of individual queues outside of cluster mode"""
- l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
- try:
- primary = HaBroker(self, name="primary", ha_cluster=False,
- args=["--ha-queue-replication=yes"]);
- pc = primary.connect()
- ps = pc.session().sender("q;{create:always}")
- pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False,
- args=["--ha-queue-replication=yes"])
- br = backup.connect().session().receiver("q;{create:always}")
+ primary = HaBroker(self, name="primary", ha_cluster=False,
+ args=["--ha-queue-replication=yes"]);
+ pc = primary.connect()
+ ps = pc.session().sender("q;{create:always}")
+ pr = pc.session().receiver("q;{create:always}")
+ backup = HaBroker(self, name="backup", ha_cluster=False,
+ args=["--ha-queue-replication=yes"])
+ bs = backup.connect().session()
+ br = bs.receiver("q;{create:always}")
+
+ def srange(*args): return [str(i) for i in xrange(*args)]
+
+ for m in srange(3): ps.send(m)
+ # Set up replication with qpid-ha
+ backup.replicate(primary.host_port(), "q")
+ backup.assert_browse_backup("q", srange(3))
+ for m in srange(3,6): ps.send(str(m))
+ backup.assert_browse_backup("q", srange(6))
+ self.assertEqual("0", pr.fetch().content)
+ pr.session.acknowledge()
+ backup.assert_browse_backup("q", srange(1,6))
+
+ # Set up replication with qpid-config
+ ps2 = pc.session().sender("q2;{create:always}")
+ backup.config_replicate(primary.host_port(), "q2");
+ ps2.send("x", timeout=1)
+ backup.assert_browse_backup("q2", ["x"])
- # Set up replication with qpid-ha
- backup.replicate(primary.host_port(), "q")
- ps.send("a", timeout=1)
- backup.assert_browse_backup("q", ["a"])
- ps.send("b", timeout=1)
- backup.assert_browse_backup("q", ["a", "b"])
- self.assertEqual("a", pr.fetch().content)
- pr.session.acknowledge()
- backup.assert_browse_backup("q", ["b"])
-
- # Set up replication with qpid-config
- ps2 = pc.session().sender("q2;{create:always}")
- backup.config_replicate(primary.host_port(), "q2");
- ps2.send("x", timeout=1)
- backup.assert_browse_backup("q2", ["x"])
- finally: l.restore()
def test_standalone_queue_replica_failover(self):
"""Test individual queue replication from a cluster to a standalone