summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-06-30 11:54:40 +0000
committerAlan Conway <aconway@apache.org>2011-06-30 11:54:40 +0000
commitafe7ad5d1af40b41b98dbc6d9c3d0e22b91f2a26 (patch)
tree83bae97c591a3694becbea1b3cceafd9213034ca
parent2e32a8dd1d548cc591cacaad3f7105e6a9521a18 (diff)
downloadqpid-python-afe7ad5d1af40b41b98dbc6d9c3d0e22b91f2a26.tar.gz
QPID-3329: Configure C++ client connections to replace url-addresses rather than merging new addresses with old
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1141493 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp5
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h1
-rw-r--r--qpid/cpp/src/tests/brokertest.py12
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py10
4 files changed, 18 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 473f5ecd1c..ced8cf66fd 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -64,7 +64,7 @@ std::string asString(const std::vector<std::string>& v) {
}
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
- reconnect(false), timeout(-1), limit(-1),
+ replaceUrls(false), reconnect(false), timeout(-1), limit(-1),
minReconnectInterval(3), maxReconnectInterval(60),
retries(0), reconnectOnLimitExceeded(true)
{
@@ -95,7 +95,10 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value)
minReconnectInterval = value;
} else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
maxReconnectInterval = value;
+ } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") {
+ replaceUrls = value.asBool();
} else if (name == "reconnect-urls" || name == "reconnect_urls") {
+ if (replaceUrls) urls.clear();
if (value.getType() == VAR_LIST) {
merge(value.asList(), urls);
} else {
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index 9e31238bc1..1b58cbbe3e 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -60,6 +60,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
qpid::sys::Semaphore semaphore;//used to coordinate reconnection
Sessions sessions;
qpid::client::Connection connection;
+ bool replaceUrls; // Replace rather than merging with reconnect-urls
std::vector<std::string> urls;
qpid::client::ConnectionSettings settings;
bool reconnect;
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 725bcaabaf..4a98c638a2 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -395,6 +395,8 @@ class Broker(Popen):
class Cluster:
"""A cluster of brokers in a test."""
+ # Client connection options for use in failover tests.
+ CONNECTION_OPTIONS = "reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true"
_cluster_count = 0
@@ -538,7 +540,8 @@ class NumberedSender(Thread):
Thread to run a sender client and send numbered messages until stopped.
"""
- def __init__(self, broker, max_depth=None, queue="test-queue"):
+ def __init__(self, broker, max_depth=None, queue="test-queue",
+ connection_options=Cluster.CONNECTION_OPTIONS):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.notify_received(n) to be called each time messages are received.
@@ -549,7 +552,7 @@ class NumberedSender(Thread):
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
- "--connection-options", "{reconnect:true,reconnect-timeout:5}",
+ "--connection-options", "{%s}"%(connection_options),
"--content-stdin"
],
expect=EXPECT_RUNNING,
@@ -600,7 +603,8 @@ class NumberedReceiver(Thread):
Thread to run a receiver client and verify it receives
sequentially numbered messages.
"""
- def __init__(self, broker, sender = None, queue="test-queue"):
+ def __init__(self, broker, sender = None, queue="test-queue",
+ connection_options=Cluster.CONNECTION_OPTIONS):
"""
sender: enable flow control. Call sender.received(n) for each message received.
"""
@@ -611,7 +615,7 @@ class NumberedReceiver(Thread):
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
- "--connection-options", "{reconnect:true,reconnect-timeout:5}",
+ "--connection-options", "{%s}"%(connection_options),
"--forever"
],
expect=EXPECT_RUNNING,
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 97c53e3d97..bfc44abfef 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -314,7 +314,7 @@ acl allow all all
"--sequence=true",
"--send-eos=1",
"--messages=100000",
- "--connection-options={reconnect:true,reconnect-timeout:5}"
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS)
])
self.receiver = self.popen(
["qpid-receive",
@@ -322,7 +322,7 @@ acl allow all all
"--address", queue,
"--ignore-duplicates",
"--check-redelivered",
- "--connection-options={reconnect:true,reconnect-timeout:5}",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
"--forever"
])
time.sleep(1)#give sender enough time to have some messages to replay
@@ -470,7 +470,7 @@ acl allow all all
"--content-size=%s" % self.size,
"--messages=%s" % self.count,
"--failover-updates",
- "--connection-options={reconnect:true,reconnect-timeout:5}",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
"--address=%s" % self.queue,
"--broker=%s" % self.broker.host_port()])
self.sender.wait()
@@ -502,7 +502,7 @@ acl allow all all
"--timeout=1",
"--print-content=no",
"--failover-updates",
- "--connection-options={reconnect:true,reconnect-timeout:5}",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
"--ack-frequency=1",
"--address=flq",
"--broker=%s" % cluster[1].host_port()])
@@ -527,7 +527,7 @@ acl allow all all
"--timeout=1",
"--print-content=no",
"--failover-updates",
- "--connection-options={reconnect:true,reconnect-timeout:5}",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
"--ack-frequency=1",
"--address=flq",
"--broker=%s" % cluster[2].host_port()])