From 83af9a32496f3e954fe847318587ac03ae78c3d6 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 2 Mar 2011 21:06:51 +0000 Subject: QPID-3105: Alternate-Exchange configuration not communicated between nodes in a cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1076375 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/cluster/Cluster.cpp | 4 ++++ qpid/cpp/src/tests/cluster_tests.py | 29 ++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index fe5a1c806e..a8a99d83b5 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -943,6 +943,10 @@ void Cluster::checkUpdateIn(Lock& l) { mAgent->suppress(false); // Enable management output. mAgent->clusterUpdate(); } + // Restore alternate exchange settings on exchanges. + broker.getExchanges().eachExchange( + boost::bind(&broker::Exchange::recoveryComplete, _1, + boost::ref(broker.getExchanges()))); enableClusterSafe(); // Enable cluster-safe assertions deliverEventQueue.start(); } diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 7443e6b663..b8407fbde8 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -22,7 +22,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs from qpid import datatypes, messaging from brokertest import * from qpid.harness import Skipped -from qpid.messaging import Message, Empty +from qpid.messaging import Message, Empty, Disposition, REJECTED from threading import Thread, Lock, Condition from logging import getLogger from itertools import chain @@ -441,6 +441,33 @@ acl allow all all return cluster[1] self.queue_flowlimit_test(Brokers()) + def test_alternate_exchange_update(self): + """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """ + cluster = self.cluster(1) + s0 = cluster[0].connect().session() + # create alt queue bound to amq.fanout exchange, will be destination for alternate exchanges + self.evaluate_address(s0, "alt;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:alt}]}}") + # create direct exchange ex with alternate-exchange amq.fanout and no queues bound + self.evaluate_address(s0, "ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'amq.fanout'}}}") + # create queue q with alternate-exchange amq.fanout + self.evaluate_address(s0, "q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'amq.fanout'}}}") + + def verify(broker): + s = broker.connect().session() + # Verify unmatched message goes to ex's alternate. + s.sender("ex").send("foo") + self.assertEqual("foo", s.receiver("alt").fetch(timeout=0).content) + # Verify rejected message goes to q's alternate. + s.sender("q").send("bar") + msg = s.receiver("q").fetch(timeout=0) + self.assertEqual("bar", msg.content) + s.acknowledge(msg, Disposition(REJECTED)) # Reject the message + self.assertEqual("bar", s.receiver("alt").fetch(timeout=0).content) + + verify(cluster[0]) + cluster.start() + verify(cluster[1]) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION= is set""" def duration(self): -- cgit v1.2.1