diff options
author | Alan Conway <aconway@apache.org> | 2008-10-17 16:45:24 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-17 16:45:24 +0000 |
commit | 7db0c0970eac260626263314c30f0e20d4ef6c21 (patch) | |
tree | 231024436b5b7185f63972d90318acce97816c22 /cpp | |
parent | a039e57108ed06586e73a255dc824ed27fc6de2a (diff) | |
download | qpid-python-7db0c0970eac260626263314c30f0e20d4ef6c21.tar.gz |
QPID-1367 Mick Goulish: improvements to client-side failover.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705668 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/examples/direct/direct_producer.cpp | 6 | ||||
-rw-r--r-- | cpp/examples/failover/direct_producer.cpp | 33 | ||||
-rw-r--r-- | cpp/examples/failover/listener.cpp | 176 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverConnection.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverConnection.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverSession.cpp | 28 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverSession.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverSubscriptionManager.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverSubscriptionManager.h | 3 | ||||
-rwxr-xr-x | cpp/src/tests/ssl_test | 16 |
12 files changed, 102 insertions, 202 deletions
diff --git a/cpp/examples/direct/direct_producer.cpp b/cpp/examples/direct/direct_producer.cpp index 40fc644bf3..baa8d9092b 100644 --- a/cpp/examples/direct/direct_producer.cpp +++ b/cpp/examples/direct/direct_producer.cpp @@ -64,6 +64,7 @@ using std::string; int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; + int count = argc>3 ? atoi(argv[3]) : 10; Connection connection; Message message; try { @@ -81,14 +82,15 @@ int main(int argc, char** argv) { // Now send some messages ... - for (int i=0; i<10; i++) { + for (int i=0; i<count; i++) { stringstream message_data; message_data << "Message " << i; message.setData(message_data.str()); // Asynchronous transfer sends messages as quickly as // possible without waiting for confirmation. - async(session).messageTransfer(arg::content=message, arg::destination="amq.direct"); + // async(session).messageTransfer(arg::content=message, arg::destination="amq.direct"); + session.messageTransfer(arg::content=message, arg::destination="amq.direct"); } // And send a final message to indicate termination. diff --git a/cpp/examples/failover/direct_producer.cpp b/cpp/examples/failover/direct_producer.cpp index 1bee56e164..513971197e 100644 --- a/cpp/examples/failover/direct_producer.cpp +++ b/cpp/examples/failover/direct_producer.cpp @@ -36,12 +36,13 @@ using namespace std; int main ( int argc, char ** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; int count = argc>3 ? atoi(argv[3]) : 30; - int delayMs = argc>4 ? atoi(argv[4]) : 1000; string program_name = "PRODUCER"; + try { FailoverConnection connection; FailoverSession * session; @@ -49,14 +50,23 @@ main ( int argc, char ** argv) connection.open ( host, port ); session = connection.newSession(); + bool report = true; int sent = 0; while ( sent < count ) { + message.getDeliveryProperties().setRoutingKey("routing_key"); - std::cout << "sending message " - << sent - << " of " - << count - << ".\n"; + + + if ( count > 1000 ) + report = !(sent % 1000); + + if ( report ) + { + std::cout << "sending message " + << sent + << ".\n"; + } + stringstream message_data; message_data << sent; message.setData(message_data.str()); @@ -70,12 +80,12 @@ main ( int argc, char ** argv) 0, message ); - usleep ( 1000*delayMs ); + ++ sent; } message.setData ( "That's all, folks!" ); - /* MICK FIXME + /* FIXME mgoulish 16 Oct 08 session.messageTransfer ( arg::content=message, arg::destination="amq.direct" ); @@ -88,10 +98,17 @@ main ( int argc, char ** argv) session->sync(); connection.close(); + std::cout << program_name + << " sent " + << sent + << " messages.\n"; + std::cout << program_name << ": " << " completed without error." << std::endl; return 0; } catch(const std::exception& error) { std::cout << program_name << ": " << error.what() << std::endl; + std::cout << program_name << "Exiting.\n"; + return 1; } return 1; } diff --git a/cpp/examples/failover/listener.cpp b/cpp/examples/failover/listener.cpp index 1c47127389..d8cb78c9ce 100644 --- a/cpp/examples/failover/listener.cpp +++ b/cpp/examples/failover/listener.cpp @@ -34,150 +34,26 @@ using namespace qpid::framing; using namespace std; -struct Recorder -{ - unsigned int max_messages; - unsigned int * messages_received; - - Recorder ( ) - { - max_messages = 1000; - messages_received = new unsigned int [ max_messages ]; - memset ( messages_received, 0, max_messages * sizeof(int) ); - } - - - void - received ( int i ) - { - messages_received[i] ++; - } - - - - void - report ( ) - { - int i; - - int last_received_message = 0; - - vector<unsigned int> missed_messages, - multiple_messages; - - /*---------------------------------------------------- - Collect indices of missed and multiple messages. - ----------------------------------------------------*/ - bool seen_first_message = false; - for ( i = max_messages - 1; i >= 0; -- i ) - { - if ( ! seen_first_message ) - { - if ( messages_received [i] > 0 ) - { - seen_first_message = true; - last_received_message = i; - } - } - else - { - if ( messages_received [i] == 0 ) - missed_messages.push_back ( i ); - else - if ( messages_received [i] > 1 ) - { - multiple_messages.push_back ( i ); - } - } - } - - /*-------------------------------------------- - Report missed messages. - --------------------------------------------*/ - char const * verb = ( missed_messages.size() == 1 ) - ? " was " - : " were "; - - char const * plural = ( missed_messages.size() == 1 ) - ? "." - : "s."; - - std::cerr << "Listener::shutdown: There" - << verb - << missed_messages.size() - << " missed message" - << plural - << endl; - - for ( i = 0; i < int(missed_messages.size()); ++ i ) - { - std::cerr << " " << i << " was missed.\n"; - } - - - /*-------------------------------------------- - Report multiple messages. - --------------------------------------------*/ - verb = ( multiple_messages.size() == 1 ) - ? " was " - : " were "; - - plural = ( multiple_messages.size() == 1 ) - ? "." - : "s."; - - std::cerr << "Listener::shutdown: There" - << verb - << multiple_messages.size() - << " multiple message" - << plural - << endl; - - for ( i = 0; i < int(multiple_messages.size()); ++ i ) - { - std::cerr << " " - << multiple_messages[i] - << " was received " - << messages_received [ multiple_messages[i] ] - << " times.\n"; - } - - /* - for ( i = 0; i < last_received_message; ++ i ) - { - std::cerr << "Message " << i << ": " << messages_received[i] << std::endl; - } - */ - } - -}; - - - - struct Listener : public MessageListener { FailoverSubscriptionManager & subscriptionManager; - Recorder & recorder; + Listener ( FailoverSubscriptionManager& subs ); - Listener ( FailoverSubscriptionManager& subs, - Recorder & recorder - ); - - void shutdown() { recorder.report(); } - void parse_message ( std::string const & msg ); + void shutdown() { subscriptionManager.stop(); } virtual void received ( Message & message ); + + int count; }; -Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) : +Listener::Listener ( FailoverSubscriptionManager & s ) : subscriptionManager(s), - recorder(r) + count(0) { } @@ -188,18 +64,19 @@ Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) : void Listener::received ( Message & message ) { - std::cerr << "Listener received: " << message.getData() << std::endl; + if(! (count%1000)) + std::cerr << "\t\tListener received: " << message.getData() << std::endl; + + ++ count; + if (message.getData() == "That's all, folks!") { std::cout << "Shutting down listener for " << message.getDestination() << std::endl; - subscriptionManager.cancel(message.getDestination()); - shutdown(); - } - else - { - parse_message ( message.getData() ); + std::cout << "Listener received " << count << " messages.\n"; + subscriptionManager.cancel(message.getDestination()); + shutdown ( ); } } @@ -207,21 +84,6 @@ Listener::received ( Message & message ) -void -Listener::parse_message ( const std::string & msg ) -{ - int msg_number; - if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) ) - { - std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n"; - return; - } - recorder.received ( msg_number ); -} - - - - int @@ -235,17 +97,12 @@ main ( int argc, char ** argv ) FailoverConnection connection; FailoverSession * session; - Recorder recorder; - - connection.name = program_name; connection.open ( host, port ); session = connection.newSession(); - session->name = program_name; FailoverSubscriptionManager subscriptions ( session ); - subscriptions.name = program_name; - Listener listener ( subscriptions, recorder ); + Listener listener ( subscriptions ); subscriptions.subscribe ( listener, "message_queue" ); subscriptions.run ( ); @@ -256,7 +113,8 @@ main ( int argc, char ** argv ) } catch(const std::exception& error) { std::cout << program_name << ": " << error.what() << std::endl; } - return 1; + + return 0; } diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 652d59f448..92cf756580 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -151,8 +151,11 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) { static const std::string CONN_CLOSED("Connection closed by broker"); void ConnectionImpl::shutdown() { - Mutex::ScopedLock l(lock); + + if ( failureCallback ) + failureCallback(); + if (handler.isClosed()) return; // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have @@ -161,9 +164,6 @@ void ConnectionImpl::shutdown() { closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); handler.fail(CONN_CLOSED); - - if ( failureCallback ) - failureCallback(); } void ConnectionImpl::erase(uint16_t ch) { diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index af805a3808..c26dba188d 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -42,6 +42,7 @@ Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a) void Subscriber::received(Message& msg) { + if (listener) { listener->received(msg); autoAck.ack(msg, session); diff --git a/cpp/src/qpid/client/FailoverConnection.cpp b/cpp/src/qpid/client/FailoverConnection.cpp index e98de868de..33b06a6a1a 100644 --- a/cpp/src/qpid/client/FailoverConnection.cpp +++ b/cpp/src/qpid/client/FailoverConnection.cpp @@ -36,7 +36,6 @@ namespace client { FailoverConnection::FailoverConnection ( ) : - name(), failoverCompleteTime(0) { connection.registerFailureCallback @@ -59,7 +58,6 @@ FailoverConnection::open ( const std::string& host, settings.host = host; settings.port = port; settings.username = uid; - settings.username = uid; settings.password = pwd; settings.virtualhost = virtualhost; settings.maxFrameSize = maxFrameSize; @@ -124,9 +122,19 @@ FailoverConnection::registerFailureCallback ( boost::function<void ()> /*fn*/ ) void FailoverConnection::failover ( ) { + std::vector<FailoverSession *>::iterator sessions_iterator; + + for ( sessions_iterator = sessions.begin(); + sessions_iterator != sessions.end(); + ++ sessions_iterator ) + { + FailoverSession * fs = * sessions_iterator; + fs->failover_in_progress = true; + } + std::vector<Url> knownBrokers = connection.getKnownBrokers(); if (knownBrokers.empty()) - throw Exception(QPID_MSG("FailoverConnection::failover " << name << " no known brokers.")); + throw Exception(QPID_MSG("FailoverConnection::failover no known brokers.")); Connection newConnection; for (std::vector<Url>::iterator i = knownBrokers.begin(); i != knownBrokers.end(); ++i) { @@ -148,7 +156,6 @@ FailoverConnection::failover ( ) */ // FIXME aconway 2008-10-10: thread unsafe, possible race with concurrent newSession - std::vector<FailoverSession *>::iterator sessions_iterator; for ( sessions_iterator = sessions.begin(); sessions_iterator < sessions.end(); ++ sessions_iterator @@ -173,6 +180,15 @@ FailoverConnection::failover ( ) FailoverSession * fs = * sessions_iterator; fs->failover ( ); } + + for ( sessions_iterator = sessions.begin(); + sessions_iterator < sessions.end(); + ++ sessions_iterator + ) + { + FailoverSession * fs = * sessions_iterator; + fs->failover_in_progress = false; + } } diff --git a/cpp/src/qpid/client/FailoverConnection.h b/cpp/src/qpid/client/FailoverConnection.h index 09e9c8bfa4..a84f0c2189 100644 --- a/cpp/src/qpid/client/FailoverConnection.h +++ b/cpp/src/qpid/client/FailoverConnection.h @@ -71,10 +71,6 @@ class FailoverConnection void registerFailureCallback ( boost::function<void ()> fn ); - // If you have more than 1 connection and you want to give them - // separate names for debugging... - std::string name; - void failover ( ); struct timeval * failoverCompleteTime; diff --git a/cpp/src/qpid/client/FailoverSession.cpp b/cpp/src/qpid/client/FailoverSession.cpp index a088a8c91b..25867c2a24 100644 --- a/cpp/src/qpid/client/FailoverSession.cpp +++ b/cpp/src/qpid/client/FailoverSession.cpp @@ -38,7 +38,7 @@ namespace qpid { namespace client { FailoverSession::FailoverSession ( ) : - name("no_name") + failover_in_progress(false) { // The session is created by FailoverConnection::newSession failoverSubscriptionManager = 0; @@ -170,11 +170,26 @@ FailoverSession::messageTransfer ( const string& destination, ) { - session.messageTransfer ( destination, - acceptMode, - acquireMode, - content - ); + while ( 1 ) + { + try + { + session.messageTransfer ( destination, + acceptMode, + acquireMode, + content + ); + break; + } + catch ( ... ) + { + // Take special action only if there is a failover in progress. + if ( ! failover_in_progress ) + break; + + usleep ( 1000 ); + } + } } @@ -583,7 +598,6 @@ FailoverSession::prepareForFailover ( Connection newConnection ) if ( failoverSubscriptionManager ) { - // failoverSubscriptionManager->prepareForFailover ( newSession ); } } diff --git a/cpp/src/qpid/client/FailoverSession.h b/cpp/src/qpid/client/FailoverSession.h index 7ff26f8079..b301353968 100644 --- a/cpp/src/qpid/client/FailoverSession.h +++ b/cpp/src/qpid/client/FailoverSession.h @@ -59,8 +59,6 @@ class FailoverSession FailoverSession ( ); ~FailoverSession ( ); - std::string name; - framing::FrameSet::shared_ptr get(); SessionId getId() const; @@ -82,6 +80,8 @@ class FailoverSession void sendCompletion ( ); + bool failover_in_progress; + // Wrapped functions from Session ---------------------------- diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp index c1ef7d00c4..d12d976ef5 100644 --- a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp +++ b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp @@ -33,7 +33,6 @@ namespace client { FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) : - name("no_name"), newSessionIsValid(false), no_failover(false) { diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.h b/cpp/src/qpid/client/FailoverSubscriptionManager.h index 651e2549c2..b7631d3a98 100644 --- a/cpp/src/qpid/client/FailoverSubscriptionManager.h +++ b/cpp/src/qpid/client/FailoverSubscriptionManager.h @@ -114,9 +114,6 @@ class FailoverSubscriptionManager void prepareForFailover ( Session newSession ); void failover ( ); - std::string name; - - private: sys::Monitor lock; diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test index ed06f19fd3..a357a38c0a 100755 --- a/cpp/src/tests/ssl_test +++ b/cpp/src/tests/ssl_test @@ -1,8 +1,8 @@ #!/bin/sh # Run a simple test over SSL -MY_DIR=$(dirname $(which $0)) -CERT_DIR=${MY_DIR}/test_cert_db -CERT_PW_FILE=${MY_DIR}/cert.password + +CERT_DIR=`pwd`/test_cert_db +CERT_PW_FILE=`pwd`/cert.password HOSTNAME=`hostname` COUNT=10000 @@ -18,13 +18,13 @@ create_certs() { } start_broker() { - ${MY_DIR}/../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no\ - --load-module ${MY_DIR}/../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port + ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no\ + --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port PORT=`cat qpidd.port` } stop_broker() { - ${MY_DIR}/../qpidd -q --port $PORT + ../qpidd -q --port $PORT } if [[ !(-e ${CERT_PW_FILE}) ]] ; then @@ -36,8 +36,8 @@ fi start_broker || error "Could not start broker" echo "Running SSL test on port $PORT" -export QPID_LOAD_MODULE=${MY_DIR}/../.libs/sslconnector.so +export QPID_LOAD_MODULE=../.libs/sslconnector.so export QPID_SSL_CERT_DB=${CERT_DIR} export QPID_SSL_CERT_PASSWORD_FILE=${CERT_PW_FILE} -${MY_DIR}/perftest --count ${COUNT} --port ${PORT} -P ssl -b $HOSTNAME --summary +./perftest --count ${COUNT} --port ${PORT} -P ssl -b $HOSTNAME --summary |