summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-17 16:45:24 +0000
committerAlan Conway <aconway@apache.org>2008-10-17 16:45:24 +0000
commit842bc4556204ea0ea4f048ed13bccb3545d22e76 (patch)
tree7f0fa62ff83d9705808a11083141aec00eeaad26 /qpid/cpp/src
parent3104d778e1352e9a3a142d8c133a115c847360c2 (diff)
downloadqpid-python-842bc4556204ea0ea4f048ed13bccb3545d22e76.tar.gz
QPID-1367 Mick Goulish: improvements to client-side failover.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@705668 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/Dispatcher.cpp1
-rw-r--r--qpid/cpp/src/qpid/client/FailoverConnection.cpp24
-rw-r--r--qpid/cpp/src/qpid/client/FailoverConnection.h4
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSession.cpp28
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSession.h4
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp1
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h3
-rwxr-xr-xqpid/cpp/src/tests/ssl_test16
9 files changed, 56 insertions, 33 deletions
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index 652d59f448..92cf756580 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -151,8 +151,11 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) {
static const std::string CONN_CLOSED("Connection closed by broker");
void ConnectionImpl::shutdown() {
-
Mutex::ScopedLock l(lock);
+
+ if ( failureCallback )
+ failureCallback();
+
if (handler.isClosed()) return;
// FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have
@@ -161,9 +164,6 @@ void ConnectionImpl::shutdown() {
closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
handler.fail(CONN_CLOSED);
-
- if ( failureCallback )
- failureCallback();
}
void ConnectionImpl::erase(uint16_t ch) {
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp
index af805a3808..c26dba188d 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.cpp
+++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp
@@ -42,6 +42,7 @@ Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a)
void Subscriber::received(Message& msg)
{
+
if (listener) {
listener->received(msg);
autoAck.ack(msg, session);
diff --git a/qpid/cpp/src/qpid/client/FailoverConnection.cpp b/qpid/cpp/src/qpid/client/FailoverConnection.cpp
index e98de868de..33b06a6a1a 100644
--- a/qpid/cpp/src/qpid/client/FailoverConnection.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverConnection.cpp
@@ -36,7 +36,6 @@ namespace client {
FailoverConnection::FailoverConnection ( ) :
- name(),
failoverCompleteTime(0)
{
connection.registerFailureCallback
@@ -59,7 +58,6 @@ FailoverConnection::open ( const std::string& host,
settings.host = host;
settings.port = port;
settings.username = uid;
- settings.username = uid;
settings.password = pwd;
settings.virtualhost = virtualhost;
settings.maxFrameSize = maxFrameSize;
@@ -124,9 +122,19 @@ FailoverConnection::registerFailureCallback ( boost::function<void ()> /*fn*/ )
void
FailoverConnection::failover ( )
{
+ std::vector<FailoverSession *>::iterator sessions_iterator;
+
+ for ( sessions_iterator = sessions.begin();
+ sessions_iterator != sessions.end();
+ ++ sessions_iterator )
+ {
+ FailoverSession * fs = * sessions_iterator;
+ fs->failover_in_progress = true;
+ }
+
std::vector<Url> knownBrokers = connection.getKnownBrokers();
if (knownBrokers.empty())
- throw Exception(QPID_MSG("FailoverConnection::failover " << name << " no known brokers."));
+ throw Exception(QPID_MSG("FailoverConnection::failover no known brokers."));
Connection newConnection;
for (std::vector<Url>::iterator i = knownBrokers.begin(); i != knownBrokers.end(); ++i) {
@@ -148,7 +156,6 @@ FailoverConnection::failover ( )
*/
// FIXME aconway 2008-10-10: thread unsafe, possible race with concurrent newSession
- std::vector<FailoverSession *>::iterator sessions_iterator;
for ( sessions_iterator = sessions.begin();
sessions_iterator < sessions.end();
++ sessions_iterator
@@ -173,6 +180,15 @@ FailoverConnection::failover ( )
FailoverSession * fs = * sessions_iterator;
fs->failover ( );
}
+
+ for ( sessions_iterator = sessions.begin();
+ sessions_iterator < sessions.end();
+ ++ sessions_iterator
+ )
+ {
+ FailoverSession * fs = * sessions_iterator;
+ fs->failover_in_progress = false;
+ }
}
diff --git a/qpid/cpp/src/qpid/client/FailoverConnection.h b/qpid/cpp/src/qpid/client/FailoverConnection.h
index 09e9c8bfa4..a84f0c2189 100644
--- a/qpid/cpp/src/qpid/client/FailoverConnection.h
+++ b/qpid/cpp/src/qpid/client/FailoverConnection.h
@@ -71,10 +71,6 @@ class FailoverConnection
void registerFailureCallback ( boost::function<void ()> fn );
- // If you have more than 1 connection and you want to give them
- // separate names for debugging...
- std::string name;
-
void failover ( );
struct timeval * failoverCompleteTime;
diff --git a/qpid/cpp/src/qpid/client/FailoverSession.cpp b/qpid/cpp/src/qpid/client/FailoverSession.cpp
index a088a8c91b..25867c2a24 100644
--- a/qpid/cpp/src/qpid/client/FailoverSession.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverSession.cpp
@@ -38,7 +38,7 @@ namespace qpid {
namespace client {
FailoverSession::FailoverSession ( ) :
- name("no_name")
+ failover_in_progress(false)
{
// The session is created by FailoverConnection::newSession
failoverSubscriptionManager = 0;
@@ -170,11 +170,26 @@ FailoverSession::messageTransfer ( const string& destination,
)
{
- session.messageTransfer ( destination,
- acceptMode,
- acquireMode,
- content
- );
+ while ( 1 )
+ {
+ try
+ {
+ session.messageTransfer ( destination,
+ acceptMode,
+ acquireMode,
+ content
+ );
+ break;
+ }
+ catch ( ... )
+ {
+ // Take special action only if there is a failover in progress.
+ if ( ! failover_in_progress )
+ break;
+
+ usleep ( 1000 );
+ }
+ }
}
@@ -583,7 +598,6 @@ FailoverSession::prepareForFailover ( Connection newConnection )
if ( failoverSubscriptionManager )
{
- //
failoverSubscriptionManager->prepareForFailover ( newSession );
}
}
diff --git a/qpid/cpp/src/qpid/client/FailoverSession.h b/qpid/cpp/src/qpid/client/FailoverSession.h
index 7ff26f8079..b301353968 100644
--- a/qpid/cpp/src/qpid/client/FailoverSession.h
+++ b/qpid/cpp/src/qpid/client/FailoverSession.h
@@ -59,8 +59,6 @@ class FailoverSession
FailoverSession ( );
~FailoverSession ( );
- std::string name;
-
framing::FrameSet::shared_ptr get();
SessionId getId() const;
@@ -82,6 +80,8 @@ class FailoverSession
void sendCompletion ( );
+ bool failover_in_progress;
+
// Wrapped functions from Session ----------------------------
diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
index c1ef7d00c4..d12d976ef5 100644
--- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
@@ -33,7 +33,6 @@ namespace client {
FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) :
- name("no_name"),
newSessionIsValid(false),
no_failover(false)
{
diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
index 651e2549c2..b7631d3a98 100644
--- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
+++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
@@ -114,9 +114,6 @@ class FailoverSubscriptionManager
void prepareForFailover ( Session newSession );
void failover ( );
- std::string name;
-
-
private:
sys::Monitor lock;
diff --git a/qpid/cpp/src/tests/ssl_test b/qpid/cpp/src/tests/ssl_test
index ed06f19fd3..a357a38c0a 100755
--- a/qpid/cpp/src/tests/ssl_test
+++ b/qpid/cpp/src/tests/ssl_test
@@ -1,8 +1,8 @@
#!/bin/sh
# Run a simple test over SSL
-MY_DIR=$(dirname $(which $0))
-CERT_DIR=${MY_DIR}/test_cert_db
-CERT_PW_FILE=${MY_DIR}/cert.password
+
+CERT_DIR=`pwd`/test_cert_db
+CERT_PW_FILE=`pwd`/cert.password
HOSTNAME=`hostname`
COUNT=10000
@@ -18,13 +18,13 @@ create_certs() {
}
start_broker() {
- ${MY_DIR}/../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no\
- --load-module ${MY_DIR}/../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port
+ ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no\
+ --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port
PORT=`cat qpidd.port`
}
stop_broker() {
- ${MY_DIR}/../qpidd -q --port $PORT
+ ../qpidd -q --port $PORT
}
if [[ !(-e ${CERT_PW_FILE}) ]] ; then
@@ -36,8 +36,8 @@ fi
start_broker || error "Could not start broker"
echo "Running SSL test on port $PORT"
-export QPID_LOAD_MODULE=${MY_DIR}/../.libs/sslconnector.so
+export QPID_LOAD_MODULE=../.libs/sslconnector.so
export QPID_SSL_CERT_DB=${CERT_DIR}
export QPID_SSL_CERT_PASSWORD_FILE=${CERT_PW_FILE}
-${MY_DIR}/perftest --count ${COUNT} --port ${PORT} -P ssl -b $HOSTNAME --summary
+./perftest --count ${COUNT} --port ${PORT} -P ssl -b $HOSTNAME --summary