diff options
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueEvents.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueEvents.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/replication/ReplicatingEventListener.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/replication/ReplicatingEventListener.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/replication/ReplicationExchange.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/replication/ReplicationExchange.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/cluster.mk | 5 | ||||
-rwxr-xr-x | cpp/src/tests/clustered_replication_test | 127 |
19 files changed, 204 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 95f55bb596..b999a62965 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -199,6 +199,7 @@ Broker::Broker(const Broker::Options& conf) : } QueuePolicy::setDefaultMaxSize(conf.queueLimit); + queues.setQueueEvents(&queueEvents); // Early-Initialize plugins const Plugin::Plugins& plugins=Plugin::getPlugins(); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 174ed165c9..dd1fe98b2c 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -102,8 +102,8 @@ static const std::string QPID_MANAGEMENT("qpid.management"); Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) - : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), - sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) + : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), + args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index dce9007643..3f9cc4c800 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -58,12 +58,12 @@ public: private: const std::string name; const bool durable; - mutable qpid::framing::FieldTable args; boost::shared_ptr<Exchange> alternate; uint32_t alternateUsers; mutable uint64_t persistenceId; protected: + mutable qpid::framing::FieldTable args; bool sequence; mutable qpid::sys::Mutex sequenceLock; int64_t sequenceNo; @@ -146,7 +146,7 @@ public: void setPersistenceId(uint64_t id) const; uint64_t getPersistenceId() const { return persistenceId; } uint32_t encodedSize() const; - void encode(framing::Buffer& buffer) const; + virtual void encode(framing::Buffer& buffer) const; static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer); diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 133e2b5ad1..30294b4507 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -382,4 +382,9 @@ void Message::resetEnqueueCompleteCallback() { enqueueCallback = 0; } void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; } void Message::resetDequeueCompleteCallback() { dequeueCallback = 0; } +framing::FieldTable& Message::getOrInsertHeaders() +{ + return getProperties<MessageProperties>()->getApplicationHeaders(); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 96fcf61dfc..dbe56270ab 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -72,6 +72,7 @@ public: std::string getExchangeName() const; bool isImmediate() const; const framing::FieldTable* getApplicationHeaders() const; + framing::FieldTable& getOrInsertHeaders(); bool isPersistent(); bool requiresAccept(); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index bc29815e84..a1a83926bf 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -93,7 +93,8 @@ Queue::Queue(const string& _name, bool _autodelete, policyExceeded(false), mgmtObject(0), eventMode(0), - eventMgr(0) + eventMgr(0), + insertSeqNo(0) { if (parent != 0) { @@ -551,6 +552,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (policy.get()) policy->tryEnqueue(qm); + if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); LVQ::iterator i; const framing::FieldTable* ft = msg->getApplicationHeaders(); @@ -578,8 +580,9 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ messages.push_back(qm); listeners.populate(copy); } - if (eventMode && eventMgr) { - eventMgr->enqueued(qm); + if (eventMode) { + if (eventMgr) eventMgr->enqueued(qm); + else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); } } copy.notify(); @@ -989,3 +992,9 @@ void Queue::recoveryComplete() for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); pendingDequeues.clear(); } + +void Queue::insertSequenceNumbers(const std::string& key) +{ + seqNoKey = key; + insertSeqNo = !seqNoKey.empty(); +} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index dfba0533e6..f149cb71ea 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -101,6 +101,8 @@ namespace qpid { RateTracker dequeueTracker; int eventMode; QueueEvents* eventMgr; + bool insertSeqNo; + std::string seqNoKey; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -291,7 +293,7 @@ namespace qpid { void setPosition(framing::SequenceNumber pos); int getEventMode(); void setQueueEventManager(QueueEvents&); - + void insertSequenceNumbers(const std::string& key); /** * Notify queue that recovery has completed. */ diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp index a6517e1bfe..7525e4cb76 100644 --- a/cpp/src/qpid/broker/QueueEvents.cpp +++ b/cpp/src/qpid/broker/QueueEvents.cpp @@ -20,12 +20,13 @@ */ #include "QueueEvents.h" #include "qpid/Exception.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) : - eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller) + eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true) { eventQueue.start(); } @@ -37,12 +38,12 @@ QueueEvents::~QueueEvents() void QueueEvents::enqueued(const QueuedMessage& m) { - eventQueue.push(Event(ENQUEUE, m)); + if (enabled) eventQueue.push(Event(ENQUEUE, m)); } void QueueEvents::dequeued(const QueuedMessage& m) { - eventQueue.push(Event(DEQUEUE, m)); + if (enabled) eventQueue.push(Event(DEQUEUE, m)); } void QueueEvents::registerListener(const std::string& id, const EventListener& listener) @@ -81,6 +82,18 @@ void QueueEvents::shutdown() if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown(); } +void QueueEvents::enable() +{ + enabled = true; + QPID_LOG(debug, "Queue events enabled"); +} + +void QueueEvents::disable() +{ + enabled = false; + QPID_LOG(debug, "Queue events disabled"); +} + QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {} diff --git a/cpp/src/qpid/broker/QueueEvents.h b/cpp/src/qpid/broker/QueueEvents.h index 2ba69e33e6..b0a07b03f2 100644 --- a/cpp/src/qpid/broker/QueueEvents.h +++ b/cpp/src/qpid/broker/QueueEvents.h @@ -59,6 +59,8 @@ class QueueEvents void dequeued(const QueuedMessage&); void registerListener(const std::string& id, const EventListener&); void unregisterListener(const std::string& id); + void enable(); + void disable(); //process all outstanding events void shutdown(); private: @@ -67,6 +69,7 @@ class QueueEvents EventQueue eventQueue; Listeners listeners; + volatile bool enabled; qpid::sys::Mutex lock;//protect listeners from concurrent access void handle(EventQueue::Queue& e); diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 2cb801bf83..d079e543c4 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -19,6 +19,7 @@ * */ #include "QueueRegistry.h" +#include "QueueEvents.h" #include "qpid/log/Statement.h" #include <sstream> #include <assert.h> @@ -27,7 +28,7 @@ using namespace qpid::broker; using namespace qpid::sys; QueueRegistry::QueueRegistry() : - counter(1), store(0), parent(0), lastNode(false) {} + counter(1), store(0), events(0), parent(0), lastNode(false) {} QueueRegistry::~QueueRegistry(){} @@ -43,7 +44,8 @@ QueueRegistry::declare(const string& declareName, bool durable, if (i == queues.end()) { Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent)); queues[name] = queue; - if (lastNode) queue->setLastNodeFailure(); + if (lastNode) queue->setLastNodeFailure(); + if (events) queue->setQueueEventManager(*events); return std::pair<Queue::shared_ptr, bool>(queue, true); } else { @@ -105,3 +107,7 @@ void QueueRegistry::updateQueueClusterState(bool _lastNode) lastNode = _lastNode; } +void QueueRegistry::setQueueEvents(QueueEvents* e) +{ + events = e; +} diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index c53ba668cc..32634dd563 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -31,6 +31,8 @@ namespace qpid { namespace broker { +class QueueEvents; + /** * A registry of queues indexed by queue name. * @@ -86,6 +88,8 @@ class QueueRegistry{ */ string generateName(); + void setQueueEvents(QueueEvents*); + /** * Set the store to use. May only be called once. */ @@ -120,6 +124,7 @@ private: mutable qpid::sys::RWlock lock; int counter; MessageStore* store; + QueueEvents* events; management::Manageable* parent; bool lastNode; //used to set mode on queue declare diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index ae160fabc7..96c47085f0 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -362,10 +362,6 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& getBroker().getExchanges().getDefault()->bind(queue, name, 0); queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments); - //if event generation is turned on, pass in a pointer to - //the QueueEvents instance to use - if (queue->getEventMode()) queue->setQueueEventManager(getBroker().getQueueEvents()); - //handle automatic cleanup: if (exclusive) { exclusiveQueues.push_back(queue); diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 169d0fb1af..126247e458 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -358,6 +358,7 @@ void Cluster::setReady(Lock&) { state = READY; if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); mcast.release(); + broker.getQueueEvents().enable(); } void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) { @@ -385,8 +386,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& elders = map.getAlive(); elders.erase(self); broker.getLinks().setPassive(true); + broker.getQueueEvents().disable(); } - } + } else if (state >= CATCHUP && memberChange) { memberUpdate(l); elders = ClusterMap::intersection(elders, map.getAlive()); diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp index 8aa47999cc..e3990a13cc 100644 --- a/cpp/src/qpid/replication/ReplicatingEventListener.cpp +++ b/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -57,7 +57,6 @@ void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeu { FieldTable headers; headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName()); - headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence); headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE); headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position); boost::intrusive_ptr<Message> msg(createMessage(headers)); @@ -69,7 +68,6 @@ void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueu boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload)); FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders(); headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); - headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence); headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE); queue->deliver(msg); } @@ -138,6 +136,7 @@ void ReplicatingEventListener::initialize(Plugin::Target& target) queue = broker->getQueues().find(options.queue); } if (queue) { + queue->insertSequenceNumbers(REPLICATION_EVENT_SEQNO); QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1); broker->getQueueEvents().registerListener(options.name, callback); QPID_LOG(info, "Registered replicating queue event listener"); diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.h b/cpp/src/qpid/replication/ReplicatingEventListener.h index d302755704..3d8f23e7ac 100644 --- a/cpp/src/qpid/replication/ReplicatingEventListener.h +++ b/cpp/src/qpid/replication/ReplicatingEventListener.h @@ -58,7 +58,6 @@ class ReplicatingEventListener : public Plugin PluginOptions options; qpid::broker::Queue::shared_ptr queue; - qpid::framing::SequenceNumber sequence; void deliverDequeueMessage(const qpid::broker::QueuedMessage& enqueued); void deliverEnqueueMessage(const qpid::broker::QueuedMessage& enqueued); diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp index c35c6c2cd5..88c94ad7ba 100644 --- a/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -34,11 +34,13 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::replication::constants; +const std::string SEQUENCE_VALUE("qpid.replication-event.sequence"); ReplicationExchange::ReplicationExchange(const std::string& name, bool durable, const FieldTable& args, QueueRegistry& qr, Manageable* parent) - : Exchange(name, durable, args, parent), queues(qr), init(false) {} + : Exchange(name, durable, args, parent), queues(qr), sequence(args.getAsInt64(SEQUENCE_VALUE)), init(false) + {} std::string ReplicationExchange::getType() const { return typeName; } @@ -135,6 +137,13 @@ bool ReplicationExchange::isBound(Queue::shared_ptr /*queue*/, const string* con const std::string ReplicationExchange::typeName("replication"); +void ReplicationExchange::encode(Buffer& buffer) const +{ + args.setInt64(std::string(SEQUENCE_VALUE), sequence); + Exchange::encode(buffer); +} + + struct ReplicationExchangePlugin : Plugin { Broker* broker; diff --git a/cpp/src/qpid/replication/ReplicationExchange.h b/cpp/src/qpid/replication/ReplicationExchange.h index 897e4a954e..4cc45ed5f5 100644 --- a/cpp/src/qpid/replication/ReplicationExchange.h +++ b/cpp/src/qpid/replication/ReplicationExchange.h @@ -22,6 +22,7 @@ * */ #include "qpid/broker/Exchange.h" +#include "qpid/framing/Buffer.h" #include "qpid/framing/SequenceNumber.h" namespace qpid { @@ -58,6 +59,7 @@ class ReplicationExchange : public qpid::broker::Exchange bool isDuplicate(const qpid::framing::FieldTable* args); void handleEnqueueEvent(const qpid::framing::FieldTable* args, qpid::broker::Deliverable& msg); void handleDequeueEvent(const qpid::framing::FieldTable* args); + void encode(framing::Buffer& buffer) const; }; }} // namespace qpid::replication diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index c3100ac968..5d115de5a5 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -29,8 +29,9 @@ if HAVE_LIBCPG # ais_check checks pre-requisites for cluster tests and runs them if ok. -TESTS+=ais_check federated_cluster_test -EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_tests cluster_python_tests_failing.txt federated_cluster_test +TESTS+=ais_check federated_cluster_test clustered_replication_test +EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_tests cluster_python_tests_failing.txt \ + federated_cluster_test clustered_replication_test check_PROGRAMS+=cluster_test cluster_test_SOURCES=unit_test.cpp cluster_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp diff --git a/cpp/src/tests/clustered_replication_test b/cpp/src/tests/clustered_replication_test new file mode 100755 index 0000000000..2a3e742632 --- /dev/null +++ b/cpp/src/tests/clustered_replication_test @@ -0,0 +1,127 @@ +#!/bin/sh + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Test reliability of the replication feature in the face of link +# failures: +srcdir=`dirname $0` +PYTHON_DIR=$srcdir/../../../python + +trap stop_brokers INT EXIT + +fail() { + echo $1 + exit 1 +} +with_ais_group() { + id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the ais group." 1>&2; exit 1; } + echo $* | newgrp ais +} + +stop_brokers() { + if [[ $PRIMARY1 ]] ; then + ../qpidd -q --port $PRIMARY1 + unset PRIMARY1 + fi + if [[ $PRIMARY2 ]] ; then + ../qpidd -q --port $PRIMARY2 + unset PRIMARY2 + fi + if [[ $DR1 ]] ; then + ../qpidd -q --port $DR1 + unset DR1 + fi + if [[ $DR2 ]] ; then + ../qpidd -q --port $DR2 + unset DR2 + fi +} + +if test -d ${PYTHON_DIR}; then + id -nG | grep '\<ais\>' >/dev/null || \ + NOGROUP="You are not a member of the ais group." + ps -u root | grep 'aisexec\|corosync' >/dev/null || \ + NOAISEXEC="The aisexec or corosync daemon is not running as root" + + if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then + cat <<EOF +Not running federation to cluster test because: + $NOGROUP + $NOAISEXEC +EOF + exit 0; + fi + + #todo: these cluster names need to be unique to prevent clashes + PRIMARY_CLUSTER=PRIMARY_$(hostname)_$(pwd) + DR_CLUSTER=DR_$(hostname)_$(pwd) + + GENERAL_OPTS="--auth no --no-module-dir --no-data-dir --daemon --port 0 --log-enable notice+ --log-to-stderr false" + PRIMARY_OPTS="--load-module ../.libs/replicating_listener.so --create-replication-queue true --replication-queue REPLICATION_QUEUE --load-module ../.libs/cluster.so --cluster-name $PRIMARY_CLUSTER" + DR_OPTS="--load-module ../.libs/replication_exchange.so --load-module ../.libs/cluster.so --cluster-name $DR_CLUSTER" + + rm -f repl*.tmp #cleanup any files left from previous run + + #start first node of primary cluster and set up test queue + echo Starting primary cluster + PRIMARY1=$(with_ais_group ../qpidd $GENERAL_OPTS $PRIMARY_OPTS --log-to-file repl.primary.1.tmp) || fail "Could not start node" + $PYTHON_DIR/commands/qpid-config -a "localhost:$PRIMARY1" add queue test-queue --generate-queue-events 2 + $PYTHON_DIR/commands/qpid-config -a "localhost:$PRIMARY1" add queue control-queue --generate-queue-events 1 + + #send 10 messages, consume 5 of them + for i in `seq 1 10`; do echo Message$i; done | ./sender --port $PRIMARY1 + ./receiver --port $PRIMARY1 --messages 5 > /dev/null + + #add new node to primary cluster, testing correct transfer of state: + echo Adding node to primary cluster + PRIMARY2=$(with_ais_group ../qpidd $GENERAL_OPTS $PRIMARY_OPTS --log-to-file repl.primary.2.tmp) + + #start DR cluster, set up test queue there and establish replication bridge + echo Starting DR cluster + DR1=$(with_ais_group ../qpidd $GENERAL_OPTS $DR_OPTS --log-to-file repl.dr.1.tmp) + DR2=$(with_ais_group ../qpidd $GENERAL_OPTS $DR_OPTS --log-to-file repl.dr.2.tmp) + + $PYTHON_DIR/commands/qpid-config -a "localhost:$DR1" add queue test-queue + $PYTHON_DIR/commands/qpid-config -a "localhost:$DR1" add queue control-queue + $PYTHON_DIR/commands/qpid-config -a "localhost:$DR1" add exchange replication REPLICATION_EXCHANGE + $PYTHON_DIR/commands/qpid-route queue add localhost:$DR2 localhost:$PRIMARY2 REPLICATION_EXCHANGE REPLICATION_QUEUE + + #send more messages to primary + for i in `seq 11 20`; do echo Message$i; done | ./sender --port $PRIMARY1 --send-eos 1 + + #wait for replication events to all be processed: + echo Waiting for replication to complete + echo Done | ./sender --port $PRIMARY1 --routing-key control-queue --send-eos 1 + ./receiver --queue control-queue --port $DR1 > /dev/null + + #verify contents of test queue on dr cluster: + echo Verifying... + ./receiver --port $DR2 > repl.out.tmp + for i in `seq 6 20`; do echo Message$i; done | diff repl.out.tmp - || FAIL=1 + + if [[ $FAIL ]]; then + echo Clustered replication test failed: expectations not met! + exit 1 + else + echo Clustered replication test passed + rm -f repl*.tmp + fi + +fi |