summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-03-09 15:58:17 +0000
committerGordon Sim <gsim@apache.org>2009-03-09 15:58:17 +0000
commit1214783620f81f2b0b1e69c4c4df874d58cdf85b (patch)
tree354f0767a8b113278cb3d5f8208ee8769fe56bac /cpp
parent8f0e57d62c16d4723e6202127490ec12473e24d0 (diff)
downloadqpid-python-1214783620f81f2b0b1e69c4c4df874d58cdf85b.tar.gz
QPID-1721: Fixes for replication between clusters when new members are added
* suppress event generation during node catch up * ensure sequence counters used for duplicate detection are synchronised in both primary and dr clusters when new members join * connect queue with the event manager within queue registry rather than adapter as the latter path is not used for catchup git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@751719 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp4
-rw-r--r--cpp/src/qpid/broker/Exchange.h4
-rw-r--r--cpp/src/qpid/broker/Message.cpp5
-rw-r--r--cpp/src/qpid/broker/Message.h1
-rw-r--r--cpp/src/qpid/broker/Queue.cpp15
-rw-r--r--cpp/src/qpid/broker/Queue.h4
-rw-r--r--cpp/src/qpid/broker/QueueEvents.cpp19
-rw-r--r--cpp/src/qpid/broker/QueueEvents.h3
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp10
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h5
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp4
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp4
-rw-r--r--cpp/src/qpid/replication/ReplicatingEventListener.cpp3
-rw-r--r--cpp/src/qpid/replication/ReplicatingEventListener.h1
-rw-r--r--cpp/src/qpid/replication/ReplicationExchange.cpp11
-rw-r--r--cpp/src/qpid/replication/ReplicationExchange.h2
-rw-r--r--cpp/src/tests/cluster.mk5
-rwxr-xr-xcpp/src/tests/clustered_replication_test127
19 files changed, 204 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 95f55bb596..b999a62965 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -199,6 +199,7 @@ Broker::Broker(const Broker::Options& conf) :
}
QueuePolicy::setDefaultMaxSize(conf.queueLimit);
+ queues.setQueueEvents(&queueEvents);
// Early-Initialize plugins
const Plugin::Plugins& plugins=Plugin::getPlugins();
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 174ed165c9..dd1fe98b2c 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -102,8 +102,8 @@ static const std::string QPID_MANAGEMENT("qpid.management");
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent)
- : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0),
- sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
+ : name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
+ args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index dce9007643..3f9cc4c800 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -58,12 +58,12 @@ public:
private:
const std::string name;
const bool durable;
- mutable qpid::framing::FieldTable args;
boost::shared_ptr<Exchange> alternate;
uint32_t alternateUsers;
mutable uint64_t persistenceId;
protected:
+ mutable qpid::framing::FieldTable args;
bool sequence;
mutable qpid::sys::Mutex sequenceLock;
int64_t sequenceNo;
@@ -146,7 +146,7 @@ public:
void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
- void encode(framing::Buffer& buffer) const;
+ virtual void encode(framing::Buffer& buffer) const;
static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 133e2b5ad1..30294b4507 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -382,4 +382,9 @@ void Message::resetEnqueueCompleteCallback() { enqueueCallback = 0; }
void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; }
void Message::resetDequeueCompleteCallback() { dequeueCallback = 0; }
+framing::FieldTable& Message::getOrInsertHeaders()
+{
+ return getProperties<MessageProperties>()->getApplicationHeaders();
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 96fcf61dfc..dbe56270ab 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -72,6 +72,7 @@ public:
std::string getExchangeName() const;
bool isImmediate() const;
const framing::FieldTable* getApplicationHeaders() const;
+ framing::FieldTable& getOrInsertHeaders();
bool isPersistent();
bool requiresAccept();
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index bc29815e84..a1a83926bf 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -93,7 +93,8 @@ Queue::Queue(const string& _name, bool _autodelete,
policyExceeded(false),
mgmtObject(0),
eventMode(0),
- eventMgr(0)
+ eventMgr(0),
+ insertSeqNo(0)
{
if (parent != 0)
{
@@ -551,6 +552,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
if (policy.get()) policy->tryEnqueue(qm);
+ if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
const framing::FieldTable* ft = msg->getApplicationHeaders();
@@ -578,8 +580,9 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
messages.push_back(qm);
listeners.populate(copy);
}
- if (eventMode && eventMgr) {
- eventMgr->enqueued(qm);
+ if (eventMode) {
+ if (eventMgr) eventMgr->enqueued(qm);
+ else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
}
}
copy.notify();
@@ -989,3 +992,9 @@ void Queue::recoveryComplete()
for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
pendingDequeues.clear();
}
+
+void Queue::insertSequenceNumbers(const std::string& key)
+{
+ seqNoKey = key;
+ insertSeqNo = !seqNoKey.empty();
+}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index dfba0533e6..f149cb71ea 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -101,6 +101,8 @@ namespace qpid {
RateTracker dequeueTracker;
int eventMode;
QueueEvents* eventMgr;
+ bool insertSeqNo;
+ std::string seqNoKey;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -291,7 +293,7 @@ namespace qpid {
void setPosition(framing::SequenceNumber pos);
int getEventMode();
void setQueueEventManager(QueueEvents&);
-
+ void insertSequenceNumbers(const std::string& key);
/**
* Notify queue that recovery has completed.
*/
diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp
index a6517e1bfe..7525e4cb76 100644
--- a/cpp/src/qpid/broker/QueueEvents.cpp
+++ b/cpp/src/qpid/broker/QueueEvents.cpp
@@ -20,12 +20,13 @@
*/
#include "QueueEvents.h"
#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) :
- eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller)
+ eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true)
{
eventQueue.start();
}
@@ -37,12 +38,12 @@ QueueEvents::~QueueEvents()
void QueueEvents::enqueued(const QueuedMessage& m)
{
- eventQueue.push(Event(ENQUEUE, m));
+ if (enabled) eventQueue.push(Event(ENQUEUE, m));
}
void QueueEvents::dequeued(const QueuedMessage& m)
{
- eventQueue.push(Event(DEQUEUE, m));
+ if (enabled) eventQueue.push(Event(DEQUEUE, m));
}
void QueueEvents::registerListener(const std::string& id, const EventListener& listener)
@@ -81,6 +82,18 @@ void QueueEvents::shutdown()
if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
}
+void QueueEvents::enable()
+{
+ enabled = true;
+ QPID_LOG(debug, "Queue events enabled");
+}
+
+void QueueEvents::disable()
+{
+ enabled = false;
+ QPID_LOG(debug, "Queue events disabled");
+}
+
QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
diff --git a/cpp/src/qpid/broker/QueueEvents.h b/cpp/src/qpid/broker/QueueEvents.h
index 2ba69e33e6..b0a07b03f2 100644
--- a/cpp/src/qpid/broker/QueueEvents.h
+++ b/cpp/src/qpid/broker/QueueEvents.h
@@ -59,6 +59,8 @@ class QueueEvents
void dequeued(const QueuedMessage&);
void registerListener(const std::string& id, const EventListener&);
void unregisterListener(const std::string& id);
+ void enable();
+ void disable();
//process all outstanding events
void shutdown();
private:
@@ -67,6 +69,7 @@ class QueueEvents
EventQueue eventQueue;
Listeners listeners;
+ volatile bool enabled;
qpid::sys::Mutex lock;//protect listeners from concurrent access
void handle(EventQueue::Queue& e);
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 2cb801bf83..d079e543c4 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -19,6 +19,7 @@
*
*/
#include "QueueRegistry.h"
+#include "QueueEvents.h"
#include "qpid/log/Statement.h"
#include <sstream>
#include <assert.h>
@@ -27,7 +28,7 @@ using namespace qpid::broker;
using namespace qpid::sys;
QueueRegistry::QueueRegistry() :
- counter(1), store(0), parent(0), lastNode(false) {}
+ counter(1), store(0), events(0), parent(0), lastNode(false) {}
QueueRegistry::~QueueRegistry(){}
@@ -43,7 +44,8 @@ QueueRegistry::declare(const string& declareName, bool durable,
if (i == queues.end()) {
Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent));
queues[name] = queue;
- if (lastNode) queue->setLastNodeFailure();
+ if (lastNode) queue->setLastNodeFailure();
+ if (events) queue->setQueueEventManager(*events);
return std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
@@ -105,3 +107,7 @@ void QueueRegistry::updateQueueClusterState(bool _lastNode)
lastNode = _lastNode;
}
+void QueueRegistry::setQueueEvents(QueueEvents* e)
+{
+ events = e;
+}
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index c53ba668cc..32634dd563 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -31,6 +31,8 @@
namespace qpid {
namespace broker {
+class QueueEvents;
+
/**
* A registry of queues indexed by queue name.
*
@@ -86,6 +88,8 @@ class QueueRegistry{
*/
string generateName();
+ void setQueueEvents(QueueEvents*);
+
/**
* Set the store to use. May only be called once.
*/
@@ -120,6 +124,7 @@ private:
mutable qpid::sys::RWlock lock;
int counter;
MessageStore* store;
+ QueueEvents* events;
management::Manageable* parent;
bool lastNode; //used to set mode on queue declare
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index ae160fabc7..96c47085f0 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -362,10 +362,6 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
getBroker().getExchanges().getDefault()->bind(queue, name, 0);
queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
- //if event generation is turned on, pass in a pointer to
- //the QueueEvents instance to use
- if (queue->getEventMode()) queue->setQueueEventManager(getBroker().getQueueEvents());
-
//handle automatic cleanup:
if (exclusive) {
exclusiveQueues.push_back(queue);
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 169d0fb1af..126247e458 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -358,6 +358,7 @@ void Cluster::setReady(Lock&) {
state = READY;
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
mcast.release();
+ broker.getQueueEvents().enable();
}
void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
@@ -385,8 +386,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
elders = map.getAlive();
elders.erase(self);
broker.getLinks().setPassive(true);
+ broker.getQueueEvents().disable();
}
- }
+ }
else if (state >= CATCHUP && memberChange) {
memberUpdate(l);
elders = ClusterMap::intersection(elders, map.getAlive());
diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp
index 8aa47999cc..e3990a13cc 100644
--- a/cpp/src/qpid/replication/ReplicatingEventListener.cpp
+++ b/cpp/src/qpid/replication/ReplicatingEventListener.cpp
@@ -57,7 +57,6 @@ void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeu
{
FieldTable headers;
headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName());
- headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE);
headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position);
boost::intrusive_ptr<Message> msg(createMessage(headers));
@@ -69,7 +68,6 @@ void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueu
boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
- headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
queue->deliver(msg);
}
@@ -138,6 +136,7 @@ void ReplicatingEventListener::initialize(Plugin::Target& target)
queue = broker->getQueues().find(options.queue);
}
if (queue) {
+ queue->insertSequenceNumbers(REPLICATION_EVENT_SEQNO);
QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1);
broker->getQueueEvents().registerListener(options.name, callback);
QPID_LOG(info, "Registered replicating queue event listener");
diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.h b/cpp/src/qpid/replication/ReplicatingEventListener.h
index d302755704..3d8f23e7ac 100644
--- a/cpp/src/qpid/replication/ReplicatingEventListener.h
+++ b/cpp/src/qpid/replication/ReplicatingEventListener.h
@@ -58,7 +58,6 @@ class ReplicatingEventListener : public Plugin
PluginOptions options;
qpid::broker::Queue::shared_ptr queue;
- qpid::framing::SequenceNumber sequence;
void deliverDequeueMessage(const qpid::broker::QueuedMessage& enqueued);
void deliverEnqueueMessage(const qpid::broker::QueuedMessage& enqueued);
diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp
index c35c6c2cd5..88c94ad7ba 100644
--- a/cpp/src/qpid/replication/ReplicationExchange.cpp
+++ b/cpp/src/qpid/replication/ReplicationExchange.cpp
@@ -34,11 +34,13 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::replication::constants;
+const std::string SEQUENCE_VALUE("qpid.replication-event.sequence");
ReplicationExchange::ReplicationExchange(const std::string& name, bool durable,
const FieldTable& args,
QueueRegistry& qr,
Manageable* parent)
- : Exchange(name, durable, args, parent), queues(qr), init(false) {}
+ : Exchange(name, durable, args, parent), queues(qr), sequence(args.getAsInt64(SEQUENCE_VALUE)), init(false)
+ {}
std::string ReplicationExchange::getType() const { return typeName; }
@@ -135,6 +137,13 @@ bool ReplicationExchange::isBound(Queue::shared_ptr /*queue*/, const string* con
const std::string ReplicationExchange::typeName("replication");
+void ReplicationExchange::encode(Buffer& buffer) const
+{
+ args.setInt64(std::string(SEQUENCE_VALUE), sequence);
+ Exchange::encode(buffer);
+}
+
+
struct ReplicationExchangePlugin : Plugin
{
Broker* broker;
diff --git a/cpp/src/qpid/replication/ReplicationExchange.h b/cpp/src/qpid/replication/ReplicationExchange.h
index 897e4a954e..4cc45ed5f5 100644
--- a/cpp/src/qpid/replication/ReplicationExchange.h
+++ b/cpp/src/qpid/replication/ReplicationExchange.h
@@ -22,6 +22,7 @@
*
*/
#include "qpid/broker/Exchange.h"
+#include "qpid/framing/Buffer.h"
#include "qpid/framing/SequenceNumber.h"
namespace qpid {
@@ -58,6 +59,7 @@ class ReplicationExchange : public qpid::broker::Exchange
bool isDuplicate(const qpid::framing::FieldTable* args);
void handleEnqueueEvent(const qpid::framing::FieldTable* args, qpid::broker::Deliverable& msg);
void handleDequeueEvent(const qpid::framing::FieldTable* args);
+ void encode(framing::Buffer& buffer) const;
};
}} // namespace qpid::replication
diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk
index c3100ac968..5d115de5a5 100644
--- a/cpp/src/tests/cluster.mk
+++ b/cpp/src/tests/cluster.mk
@@ -29,8 +29,9 @@ if HAVE_LIBCPG
# ais_check checks pre-requisites for cluster tests and runs them if ok.
-TESTS+=ais_check federated_cluster_test
-EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_tests cluster_python_tests_failing.txt federated_cluster_test
+TESTS+=ais_check federated_cluster_test clustered_replication_test
+EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_tests cluster_python_tests_failing.txt \
+ federated_cluster_test clustered_replication_test
check_PROGRAMS+=cluster_test
cluster_test_SOURCES=unit_test.cpp cluster_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp
diff --git a/cpp/src/tests/clustered_replication_test b/cpp/src/tests/clustered_replication_test
new file mode 100755
index 0000000000..2a3e742632
--- /dev/null
+++ b/cpp/src/tests/clustered_replication_test
@@ -0,0 +1,127 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Test reliability of the replication feature in the face of link
+# failures:
+srcdir=`dirname $0`
+PYTHON_DIR=$srcdir/../../../python
+
+trap stop_brokers INT EXIT
+
+fail() {
+ echo $1
+ exit 1
+}
+with_ais_group() {
+ id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the ais group." 1>&2; exit 1; }
+ echo $* | newgrp ais
+}
+
+stop_brokers() {
+ if [[ $PRIMARY1 ]] ; then
+ ../qpidd -q --port $PRIMARY1
+ unset PRIMARY1
+ fi
+ if [[ $PRIMARY2 ]] ; then
+ ../qpidd -q --port $PRIMARY2
+ unset PRIMARY2
+ fi
+ if [[ $DR1 ]] ; then
+ ../qpidd -q --port $DR1
+ unset DR1
+ fi
+ if [[ $DR2 ]] ; then
+ ../qpidd -q --port $DR2
+ unset DR2
+ fi
+}
+
+if test -d ${PYTHON_DIR}; then
+ id -nG | grep '\<ais\>' >/dev/null || \
+ NOGROUP="You are not a member of the ais group."
+ ps -u root | grep 'aisexec\|corosync' >/dev/null || \
+ NOAISEXEC="The aisexec or corosync daemon is not running as root"
+
+ if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then
+ cat <<EOF
+Not running federation to cluster test because:
+ $NOGROUP
+ $NOAISEXEC
+EOF
+ exit 0;
+ fi
+
+ #todo: these cluster names need to be unique to prevent clashes
+ PRIMARY_CLUSTER=PRIMARY_$(hostname)_$(pwd)
+ DR_CLUSTER=DR_$(hostname)_$(pwd)
+
+ GENERAL_OPTS="--auth no --no-module-dir --no-data-dir --daemon --port 0 --log-enable notice+ --log-to-stderr false"
+ PRIMARY_OPTS="--load-module ../.libs/replicating_listener.so --create-replication-queue true --replication-queue REPLICATION_QUEUE --load-module ../.libs/cluster.so --cluster-name $PRIMARY_CLUSTER"
+ DR_OPTS="--load-module ../.libs/replication_exchange.so --load-module ../.libs/cluster.so --cluster-name $DR_CLUSTER"
+
+ rm -f repl*.tmp #cleanup any files left from previous run
+
+ #start first node of primary cluster and set up test queue
+ echo Starting primary cluster
+ PRIMARY1=$(with_ais_group ../qpidd $GENERAL_OPTS $PRIMARY_OPTS --log-to-file repl.primary.1.tmp) || fail "Could not start node"
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$PRIMARY1" add queue test-queue --generate-queue-events 2
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$PRIMARY1" add queue control-queue --generate-queue-events 1
+
+ #send 10 messages, consume 5 of them
+ for i in `seq 1 10`; do echo Message$i; done | ./sender --port $PRIMARY1
+ ./receiver --port $PRIMARY1 --messages 5 > /dev/null
+
+ #add new node to primary cluster, testing correct transfer of state:
+ echo Adding node to primary cluster
+ PRIMARY2=$(with_ais_group ../qpidd $GENERAL_OPTS $PRIMARY_OPTS --log-to-file repl.primary.2.tmp)
+
+ #start DR cluster, set up test queue there and establish replication bridge
+ echo Starting DR cluster
+ DR1=$(with_ais_group ../qpidd $GENERAL_OPTS $DR_OPTS --log-to-file repl.dr.1.tmp)
+ DR2=$(with_ais_group ../qpidd $GENERAL_OPTS $DR_OPTS --log-to-file repl.dr.2.tmp)
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$DR1" add queue test-queue
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$DR1" add queue control-queue
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$DR1" add exchange replication REPLICATION_EXCHANGE
+ $PYTHON_DIR/commands/qpid-route queue add localhost:$DR2 localhost:$PRIMARY2 REPLICATION_EXCHANGE REPLICATION_QUEUE
+
+ #send more messages to primary
+ for i in `seq 11 20`; do echo Message$i; done | ./sender --port $PRIMARY1 --send-eos 1
+
+ #wait for replication events to all be processed:
+ echo Waiting for replication to complete
+ echo Done | ./sender --port $PRIMARY1 --routing-key control-queue --send-eos 1
+ ./receiver --queue control-queue --port $DR1 > /dev/null
+
+ #verify contents of test queue on dr cluster:
+ echo Verifying...
+ ./receiver --port $DR2 > repl.out.tmp
+ for i in `seq 6 20`; do echo Message$i; done | diff repl.out.tmp - || FAIL=1
+
+ if [[ $FAIL ]]; then
+ echo Clustered replication test failed: expectations not met!
+ exit 1
+ else
+ echo Clustered replication test passed
+ rm -f repl*.tmp
+ fi
+
+fi