summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-05-13 18:54:50 +0000
committerAlan Conway <aconway@apache.org>2010-05-13 18:54:50 +0000
commit87fb41d2994771d993debdb786c9697d16da4a0e (patch)
tree55b16a654fc1be7ed4a37d9c3b47e22f2d4da298 /cpp
parent7e13897c9238d0e5a6a64df64eeddceb14c36002 (diff)
downloadqpid-python-87fb41d2994771d993debdb786c9697d16da4a0e.tar.gz
New API clients failover in a cluster with SSL connections.
- Fix setting of reconnect URLs on messaging::Connection. - Added SSL failover test. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@943974 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp56
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.h5
-rw-r--r--cpp/src/tests/qpid_ping.cpp8
-rwxr-xr-xcpp/src/tests/ssl_test52
4 files changed, 83 insertions, 38 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 3ebc5f17ad..777a952bae 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -1,4 +1,4 @@
- /*
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -29,6 +29,7 @@
#include "qpid/Url.h"
#include <boost/intrusive_ptr.hpp>
#include <vector>
+#include <sstream>
namespace qpid {
namespace client {
@@ -53,15 +54,26 @@ template <class T> bool setIfFound(const Variant::Map& map, const std::string& k
QPID_LOG(debug, "option " << key << " specified as " << i->second);
return true;
} else {
- QPID_LOG(debug, "option " << key << " not specified");
return false;
}
}
-template <>
-bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
- const std::string& key,
- std::vector<std::string>& value)
+namespace {
+std::string asString(const std::vector<std::string>& v) {
+ std::stringstream os;
+ os << "[";
+ for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
+ if (i != v.begin()) os << ", ";
+ os << *i;
+ }
+ os << "]";
+ return os.str();
+}
+}
+
+template <> bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
+ const std::string& key,
+ std::vector<std::string>& value)
{
Variant::Map::const_iterator i = map.find(key);
if (i != map.end()) {
@@ -71,6 +83,7 @@ bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
} else {
value.push_back(i->second.asString());
}
+ QPID_LOG(debug, "option " << key << " specified as " << asString(value));
return true;
} else {
return false;
@@ -102,9 +115,9 @@ ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& optio
minReconnectInterval(3), maxReconnectInterval(60),
retries(0), reconnectOnLimitExceeded(true)
{
- QPID_LOG(debug, "Created connection with " << options);
setOptions(options);
urls.insert(urls.begin(), url);
+ QPID_LOG(debug, "Created connection " << url << " with " << options);
}
void ConnectionImpl::setOptions(const Variant::Map& options)
@@ -127,17 +140,12 @@ void ConnectionImpl::setOptions(const Variant::Map& options)
void ConnectionImpl::setOption(const std::string& name, const Variant& value)
{
- if (name == "url") {
- if (urls.size()) urls[0] = value.asString();
- else urls.insert(urls.begin(), value.asString());
- } else {
- Variant::Map options;
- options[name] = value;
- setOptions(options);
- QPID_LOG(debug, "Set " << name << " to " << value);
- }
+ Variant::Map options;
+ options[name] = value;
+ setOptions(options);
}
+
void ConnectionImpl::close()
{
while(true) {
@@ -246,6 +254,17 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
retries = 0;
}
+void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) {
+ if (more.size()) {
+ for (size_t i = 0; i < more.size(); ++i) {
+ if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) {
+ urls.push_back(more[i].str());
+ }
+ }
+ QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
+ }
+}
+
bool ConnectionImpl::tryConnect()
{
sys::Mutex::ScopedLock l(lock);
@@ -260,13 +279,14 @@ bool ConnectionImpl::tryConnect()
SimpleUrlParser::parse(*i, settings);
connection.open(settings);
}
- QPID_LOG(info, "Connected to " << *i);
+ QPID_LOG(info, "Connected to " << *i);
+ mergeUrls(connection.getInitialBrokers(), l);
return resetSessions(l);
} catch (const qpid::ConnectionException& e) {
//TODO: need to fix timeout on
//qpid::client::Connection::open() so that it throws
//TransportFailure rather than a ConnectionException
- QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
+ QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
}
}
return false;
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index 93929a6034..f32a07569b 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -28,8 +28,11 @@
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Semaphore.h"
#include <map>
+#include <vector>
namespace qpid {
+class Url;
+
namespace client {
namespace amqp0_10 {
@@ -69,7 +72,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
void connect(const qpid::sys::AbsTime& started);
bool tryConnect();
bool resetSessions(const sys::Mutex::ScopedLock&); // dummy parameter indicates call with lock held.
-
+ void mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&);
};
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/tests/qpid_ping.cpp b/cpp/src/tests/qpid_ping.cpp
index b046fdf54b..95c6914b5c 100644
--- a/cpp/src/tests/qpid_ping.cpp
+++ b/cpp/src/tests/qpid_ping.cpp
@@ -81,8 +81,7 @@ class Ping : public Runnable {
status = SUCCESS;
lock.notifyAll();
} catch (const exception& e) {
- if (!opts.quiet)
- cerr << "Unexpected exception: " << e.what() << endl;
+ cerr << "Unexpected exception: " << e.what() << endl;
Mutex::ScopedLock l(lock);
status = ERROR;
lock.notifyAll();
@@ -112,12 +111,11 @@ int main(int argc, char** argv) {
opts.parse(argc, argv);
Ping ping;
ping.start();
- if (!ping.wait()) exit(1);
+ if (!ping.wait()) return 1;
if (!opts.quiet) cout << "Success!" << endl;
return 0;
} catch (const exception& e) {
- if (!opts.quiet)
- cerr << "Unexpected exception: " << e.what() << endl;
+ cerr << "Unexpected exception: " << e.what() << endl;
return 1;
}
}
diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test
index 1564abb5f5..b813233533 100755
--- a/cpp/src/tests/ssl_test
+++ b/cpp/src/tests/ssl_test
@@ -45,18 +45,12 @@ delete_certs() {
fi
}
-start_broker() {
- PORT=`../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME --require-encryption`
-}
-
-stop_broker() {
- if [[ $PORT ]] ; then
- $QPIDD_EXEC --no-module-dir -q --port $PORT
- fi
-}
+COMMON_OPTS="--daemon --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME --require-encryption"
+start_broker() { ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS; }
cleanup() {
- stop_broker
+ test -n "$PORT" && ../qpidd --no-module-dir -qp $PORT
+ test -n "$PORT2" && ../qpidd --no-module-dir -qp $PORT2
delete_certs
}
@@ -71,18 +65,48 @@ if [[ !(-e ${CERT_PW_FILE}) ]] ; then
fi
delete_certs
create_certs || error "Could not create test certificate"
-
-start_broker || error "Could not start broker"
+PORT=`start_broker` || error "Could not start broker"
echo "Running SSL test on port $PORT"
export QPID_NO_MODULE_DIR=1
export QPID_LOAD_MODULE=$SSLCONNECTOR_LIB
export QPID_SSL_CERT_DB=${CERT_DIR}
export QPID_SSL_CERT_PASSWORD_FILE=${CERT_PW_FILE}
-# Test connection via connection settings
+
+## Test connection via connection settings
./perftest --count ${COUNT} --port ${PORT} -P ssl -b $TEST_HOSTNAME --summary
-# Test connection with a URL
+## Test connection with a URL
URL=amqp:ssl:$TEST_HOSTNAME:$PORT
./qpid_send -b $URL --content-string=hello -a "foo;{create:always}"
MSG=`./qpid_receive -b $URL -a "foo;{create:always}" --messages 1`
test "$MSG" = "hello" || { echo "receive failed '$MSG' != 'hello'"; exit 1; }
+
+test -z $CLUSTER_LIB && exit 0 # Exit if cluster not supported.
+
+## Test failover in a cluster using SSL only
+pick_port() {
+ # We need a fixed port to set --cluster-url. Use qpidd to pick a free port.
+ PICK=`../qpidd --no-module-dir -dp0`
+ ../qpidd --no-module-dir -qp $PICK
+ echo $PICK
+}
+ssl_cluster_broker() { # $1 = port
+ ../qpidd $COMMON_OPTS --load-module $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1 --port 0 --ssl-port $1 --transport ssl > /dev/null
+ # Wait for broker to be ready
+ qpid_ping -Pssl -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; }
+ echo "Running SSL cluster broker on port $1"
+}
+
+PORT1=`pick_port`; ssl_cluster_broker $PORT1
+PORT2=`pick_port`; ssl_cluster_broker $PORT2
+
+# Pipe receive output to uniq to remove duplicates
+./qpid_receive --connection-options "{reconnect-timeout:5}" --failover-updates -b amqp:ssl:$TEST_HOSTNAME:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp &
+./qpid_send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=one -a "foo;{create:always}"
+../qpidd --no-module-dir -qp $PORT1 # Kill broker 1 receiver should fail-over.
+./qpid_send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=two -a "foo;{create:always}" --send-eos 1
+wait # Wait for qpid_receive
+{ echo one; echo two; } > ssl_test_receive.cmp
+diff ssl_test_receive.tmp ssl_test_receive.cmp || { echo "Failover failed"; exit 1; }
+rm -f ssl_test_receive.*
+