diff options
author | Alan Conway <aconway@apache.org> | 2010-05-13 18:54:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-05-13 18:54:50 +0000 |
commit | 87fb41d2994771d993debdb786c9697d16da4a0e (patch) | |
tree | 55b16a654fc1be7ed4a37d9c3b47e22f2d4da298 /cpp | |
parent | 7e13897c9238d0e5a6a64df64eeddceb14c36002 (diff) | |
download | qpid-python-87fb41d2994771d993debdb786c9697d16da4a0e.tar.gz |
New API clients failover in a cluster with SSL connections.
- Fix setting of reconnect URLs on messaging::Connection.
- Added SSL failover test.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@943974 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 56 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.h | 5 | ||||
-rw-r--r-- | cpp/src/tests/qpid_ping.cpp | 8 | ||||
-rwxr-xr-x | cpp/src/tests/ssl_test | 52 |
4 files changed, 83 insertions, 38 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 3ebc5f17ad..777a952bae 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -1,4 +1,4 @@ - /* +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -29,6 +29,7 @@ #include "qpid/Url.h" #include <boost/intrusive_ptr.hpp> #include <vector> +#include <sstream> namespace qpid { namespace client { @@ -53,15 +54,26 @@ template <class T> bool setIfFound(const Variant::Map& map, const std::string& k QPID_LOG(debug, "option " << key << " specified as " << i->second); return true; } else { - QPID_LOG(debug, "option " << key << " not specified"); return false; } } -template <> -bool setIfFound< std::vector<std::string> >(const Variant::Map& map, - const std::string& key, - std::vector<std::string>& value) +namespace { +std::string asString(const std::vector<std::string>& v) { + std::stringstream os; + os << "["; + for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) { + if (i != v.begin()) os << ", "; + os << *i; + } + os << "]"; + return os.str(); +} +} + +template <> bool setIfFound< std::vector<std::string> >(const Variant::Map& map, + const std::string& key, + std::vector<std::string>& value) { Variant::Map::const_iterator i = map.find(key); if (i != map.end()) { @@ -71,6 +83,7 @@ bool setIfFound< std::vector<std::string> >(const Variant::Map& map, } else { value.push_back(i->second.asString()); } + QPID_LOG(debug, "option " << key << " specified as " << asString(value)); return true; } else { return false; @@ -102,9 +115,9 @@ ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& optio minReconnectInterval(3), maxReconnectInterval(60), retries(0), reconnectOnLimitExceeded(true) { - QPID_LOG(debug, "Created connection with " << options); setOptions(options); urls.insert(urls.begin(), url); + QPID_LOG(debug, "Created connection " << url << " with " << options); } void ConnectionImpl::setOptions(const Variant::Map& options) @@ -127,17 +140,12 @@ void ConnectionImpl::setOptions(const Variant::Map& options) void ConnectionImpl::setOption(const std::string& name, const Variant& value) { - if (name == "url") { - if (urls.size()) urls[0] = value.asString(); - else urls.insert(urls.begin(), value.asString()); - } else { - Variant::Map options; - options[name] = value; - setOptions(options); - QPID_LOG(debug, "Set " << name << " to " << value); - } + Variant::Map options; + options[name] = value; + setOptions(options); } + void ConnectionImpl::close() { while(true) { @@ -246,6 +254,17 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started) retries = 0; } +void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) { + if (more.size()) { + for (size_t i = 0; i < more.size(); ++i) { + if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) { + urls.push_back(more[i].str()); + } + } + QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls)); + } +} + bool ConnectionImpl::tryConnect() { sys::Mutex::ScopedLock l(lock); @@ -260,13 +279,14 @@ bool ConnectionImpl::tryConnect() SimpleUrlParser::parse(*i, settings); connection.open(settings); } - QPID_LOG(info, "Connected to " << *i); + QPID_LOG(info, "Connected to " << *i); + mergeUrls(connection.getInitialBrokers(), l); return resetSessions(l); } catch (const qpid::ConnectionException& e) { //TODO: need to fix timeout on //qpid::client::Connection::open() so that it throws //TransportFailure rather than a ConnectionException - QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); + QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); } } return false; diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 93929a6034..f32a07569b 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -28,8 +28,11 @@ #include "qpid/sys/Mutex.h" #include "qpid/sys/Semaphore.h" #include <map> +#include <vector> namespace qpid { +class Url; + namespace client { namespace amqp0_10 { @@ -69,7 +72,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl void connect(const qpid::sys::AbsTime& started); bool tryConnect(); bool resetSessions(const sys::Mutex::ScopedLock&); // dummy parameter indicates call with lock held. - + void mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&); }; }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/tests/qpid_ping.cpp b/cpp/src/tests/qpid_ping.cpp index b046fdf54b..95c6914b5c 100644 --- a/cpp/src/tests/qpid_ping.cpp +++ b/cpp/src/tests/qpid_ping.cpp @@ -81,8 +81,7 @@ class Ping : public Runnable { status = SUCCESS; lock.notifyAll(); } catch (const exception& e) { - if (!opts.quiet) - cerr << "Unexpected exception: " << e.what() << endl; + cerr << "Unexpected exception: " << e.what() << endl; Mutex::ScopedLock l(lock); status = ERROR; lock.notifyAll(); @@ -112,12 +111,11 @@ int main(int argc, char** argv) { opts.parse(argc, argv); Ping ping; ping.start(); - if (!ping.wait()) exit(1); + if (!ping.wait()) return 1; if (!opts.quiet) cout << "Success!" << endl; return 0; } catch (const exception& e) { - if (!opts.quiet) - cerr << "Unexpected exception: " << e.what() << endl; + cerr << "Unexpected exception: " << e.what() << endl; return 1; } } diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test index 1564abb5f5..b813233533 100755 --- a/cpp/src/tests/ssl_test +++ b/cpp/src/tests/ssl_test @@ -45,18 +45,12 @@ delete_certs() { fi } -start_broker() { - PORT=`../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME --require-encryption` -} - -stop_broker() { - if [[ $PORT ]] ; then - $QPIDD_EXEC --no-module-dir -q --port $PORT - fi -} +COMMON_OPTS="--daemon --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME --require-encryption" +start_broker() { ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS; } cleanup() { - stop_broker + test -n "$PORT" && ../qpidd --no-module-dir -qp $PORT + test -n "$PORT2" && ../qpidd --no-module-dir -qp $PORT2 delete_certs } @@ -71,18 +65,48 @@ if [[ !(-e ${CERT_PW_FILE}) ]] ; then fi delete_certs create_certs || error "Could not create test certificate" - -start_broker || error "Could not start broker" +PORT=`start_broker` || error "Could not start broker" echo "Running SSL test on port $PORT" export QPID_NO_MODULE_DIR=1 export QPID_LOAD_MODULE=$SSLCONNECTOR_LIB export QPID_SSL_CERT_DB=${CERT_DIR} export QPID_SSL_CERT_PASSWORD_FILE=${CERT_PW_FILE} -# Test connection via connection settings + +## Test connection via connection settings ./perftest --count ${COUNT} --port ${PORT} -P ssl -b $TEST_HOSTNAME --summary -# Test connection with a URL +## Test connection with a URL URL=amqp:ssl:$TEST_HOSTNAME:$PORT ./qpid_send -b $URL --content-string=hello -a "foo;{create:always}" MSG=`./qpid_receive -b $URL -a "foo;{create:always}" --messages 1` test "$MSG" = "hello" || { echo "receive failed '$MSG' != 'hello'"; exit 1; } + +test -z $CLUSTER_LIB && exit 0 # Exit if cluster not supported. + +## Test failover in a cluster using SSL only +pick_port() { + # We need a fixed port to set --cluster-url. Use qpidd to pick a free port. + PICK=`../qpidd --no-module-dir -dp0` + ../qpidd --no-module-dir -qp $PICK + echo $PICK +} +ssl_cluster_broker() { # $1 = port + ../qpidd $COMMON_OPTS --load-module $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1 --port 0 --ssl-port $1 --transport ssl > /dev/null + # Wait for broker to be ready + qpid_ping -Pssl -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; } + echo "Running SSL cluster broker on port $1" +} + +PORT1=`pick_port`; ssl_cluster_broker $PORT1 +PORT2=`pick_port`; ssl_cluster_broker $PORT2 + +# Pipe receive output to uniq to remove duplicates +./qpid_receive --connection-options "{reconnect-timeout:5}" --failover-updates -b amqp:ssl:$TEST_HOSTNAME:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp & +./qpid_send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=one -a "foo;{create:always}" +../qpidd --no-module-dir -qp $PORT1 # Kill broker 1 receiver should fail-over. +./qpid_send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=two -a "foo;{create:always}" --send-eos 1 +wait # Wait for qpid_receive +{ echo one; echo two; } > ssl_test_receive.cmp +diff ssl_test_receive.tmp ssl_test_receive.cmp || { echo "Failover failed"; exit 1; } +rm -f ssl_test_receive.* + |