diff options
author | Alan Conway <aconway@apache.org> | 2008-10-10 17:44:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-10 17:44:50 +0000 |
commit | d1bc2d8115b9079be2baa61f2d60e36656fb5cf4 (patch) | |
tree | dbe771c3823884ae1726fcf5bf8dabe5194c0243 | |
parent | d7c677547f17a51f8c97ea10b7516fd44ef8d1d2 (diff) | |
download | qpid-python-d1bc2d8115b9079be2baa61f2d60e36656fb5cf4.tar.gz |
Failover client and example fixes & tidy up.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@703532 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/examples/failover/direct_producer.cpp | 244 | ||||
-rw-r--r-- | qpid/cpp/examples/failover/listener.cpp | 297 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connection.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionImpl.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverListener.cpp | 5 |
5 files changed, 281 insertions, 274 deletions
diff --git a/qpid/cpp/examples/failover/direct_producer.cpp b/qpid/cpp/examples/failover/direct_producer.cpp index 9a510b6fb3..d8e74cdc41 100644 --- a/qpid/cpp/examples/failover/direct_producer.cpp +++ b/qpid/cpp/examples/failover/direct_producer.cpp @@ -22,127 +22,129 @@ using namespace std; int main ( int argc, char ** argv) { - struct timeval broker_killed_time = {0,0}, - failover_complete_time = {0,0}, - duration = {0,0}; - - - if ( argc < 3 ) - { - std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n"; - std::cerr << "i.e. for host: 127.0.0.1\n"; - exit(1); - } - - char const * host = argv[1]; - int port = atoi(argv[2]); - char const * broker_to_kill = 0; - - if ( argc > 3 ) - { - broker_to_kill = argv[3]; - std::cerr << "main: Broker marked for death is process ID " - << broker_to_kill - << endl; - } - else - { - std::cerr << "PRODUCER main: there is no broker to kill.\n"; - } - - FailoverConnection connection; - FailoverSession * session; - Message message; - - string program_name = "PRODUCER"; - - - connection.failoverCompleteTime = & failover_complete_time; - connection.name = program_name; - connection.open ( host, port ); - - session = connection.newSession(); - session->name = program_name; - - int send_this_many = 30, - messages_sent = 0; - - while ( messages_sent < send_this_many ) - { - if ( (messages_sent == 13) && broker_to_kill ) - { - char command[1000]; - std::cerr << program_name << " killing broker " << broker_to_kill << ".\n"; - sprintf(command, "kill -9 %s", broker_to_kill); - system ( command ); - gettimeofday ( & broker_killed_time, 0 ); - } - - message.getDeliveryProperties().setRoutingKey("routing_key"); - - std::cerr << "sending message " - << messages_sent - << " of " - << send_this_many - << ".\n"; - - stringstream message_data; - message_data << messages_sent; - message.setData(message_data.str()); - - try - { - /* MICK FIXME - session.messageTransfer ( arg::content=message, - arg::destination="amq.direct" - ); */ - session->messageTransfer ( "amq.direct", - 1, - 0, - message - ); - } - catch ( const std::exception& error) - { - cerr << program_name << " exception: " << error.what() << endl; - } - - sleep ( 1 ); - ++ messages_sent; - } - - message.setData ( "That's all, folks!" ); - - /* MICK FIXME - session.messageTransfer ( arg::content=message, - arg::destination="amq.direct" - ); - */ - session->messageTransfer ( "amq.direct", - 1, - 0, - message - ); - - session->sync(); - connection.close(); - - // This will be incorrect if you killed more than one... - if ( broker_to_kill ) - { - timersub ( & failover_complete_time, - & broker_killed_time, - & duration - ); - fprintf ( stderr, - "Failover time: %ld.%.6ld\n", - duration.tv_sec, - duration.tv_usec + try { + struct timeval broker_killed_time = {0,0}; + struct timeval failover_complete_time = {0,0}; + struct timeval duration = {0,0}; + + + if ( argc < 3 ) + { + std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n"; + std::cerr << "i.e. for host: 127.0.0.1\n"; + exit(1); + } + + char const * host = argv[1]; + int port = atoi(argv[2]); + char const * broker_to_kill = 0; + + if ( argc > 3 ) + { + broker_to_kill = argv[3]; + std::cerr << "main: Broker marked for death is process ID " + << broker_to_kill + << endl; + } + else + { + std::cerr << "PRODUCER main: there is no broker to kill.\n"; + } + + FailoverConnection connection; + FailoverSession * session; + Message message; + + string program_name = "PRODUCER"; + + + connection.failoverCompleteTime = & failover_complete_time; + connection.name = program_name; + connection.open ( host, port ); + + session = connection.newSession(); + session->name = program_name; + + int send_this_many = 30, + messages_sent = 0; + + while ( messages_sent < send_this_many ) + { + if ( (messages_sent == 13) && broker_to_kill ) + { + char command[1000]; + std::cerr << program_name << " killing broker " << broker_to_kill << ".\n"; + sprintf(command, "kill -9 %s", broker_to_kill); + system ( command ); + gettimeofday ( & broker_killed_time, 0 ); + } + + message.getDeliveryProperties().setRoutingKey("routing_key"); + + std::cerr << "sending message " + << messages_sent + << " of " + << send_this_many + << ".\n"; + + stringstream message_data; + message_data << messages_sent; + message.setData(message_data.str()); + + try + { + /* MICK FIXME + session.messageTransfer ( arg::content=message, + arg::destination="amq.direct" + ); */ + session->messageTransfer ( "amq.direct", + 1, + 0, + message + ); + } + catch ( const std::exception& error) + { + cerr << program_name << " exception: " << error.what() << endl; + } + + sleep ( 1 ); + ++ messages_sent; + } + + message.setData ( "That's all, folks!" ); + + /* MICK FIXME + session.messageTransfer ( arg::content=message, + arg::destination="amq.direct" + ); + */ + session->messageTransfer ( "amq.direct", + 1, + 0, + message + ); + + session->sync(); + connection.close(); + + // This will be incorrect if you killed more than one... + if ( broker_to_kill ) + { + timersub ( & failover_complete_time, + & broker_killed_time, + & duration ); - } + fprintf ( stderr, + "Failover time: %ld.%.6ld\n", + duration.tv_sec, + duration.tv_usec + ); + } + return 0; - return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; } - - - diff --git a/qpid/cpp/examples/failover/listener.cpp b/qpid/cpp/examples/failover/listener.cpp index c58a3b5e71..d5ade9b9e2 100644 --- a/qpid/cpp/examples/failover/listener.cpp +++ b/qpid/cpp/examples/failover/listener.cpp @@ -20,119 +20,119 @@ using namespace std; struct Recorder { - unsigned int max_messages; - unsigned int * messages_received; + unsigned int max_messages; + unsigned int * messages_received; - Recorder ( ) - { - max_messages = 1000; - messages_received = new unsigned int [ max_messages ]; - memset ( messages_received, 0, max_messages * sizeof(int) ); - } + Recorder ( ) + { + max_messages = 1000; + messages_received = new unsigned int [ max_messages ]; + memset ( messages_received, 0, max_messages * sizeof(int) ); + } - void - received ( int i ) - { - messages_received[i] ++; - } + void + received ( int i ) + { + messages_received[i] ++; + } - void - report ( ) - { - int i; + void + report ( ) + { + int i; - int last_received_message = 0; + int last_received_message = 0; - vector<unsigned int> missed_messages, - multiple_messages; + vector<unsigned int> missed_messages, + multiple_messages; - /*---------------------------------------------------- - Collect indices of missed and multiple messages. - ----------------------------------------------------*/ - bool seen_first_message = false; - for ( i = max_messages - 1; i >= 0; -- i ) - { - if ( ! seen_first_message ) - { - if ( messages_received [i] > 0 ) + /*---------------------------------------------------- + Collect indices of missed and multiple messages. + ----------------------------------------------------*/ + bool seen_first_message = false; + for ( i = max_messages - 1; i >= 0; -- i ) { - seen_first_message = true; - last_received_message = i; + if ( ! seen_first_message ) + { + if ( messages_received [i] > 0 ) + { + seen_first_message = true; + last_received_message = i; + } + } + else + { + if ( messages_received [i] == 0 ) + missed_messages.push_back ( i ); + else + if ( messages_received [i] > 1 ) + { + multiple_messages.push_back ( i ); + } + } } - } - else - { - if ( messages_received [i] == 0 ) - missed_messages.push_back ( i ); - else - if ( messages_received [i] > 1 ) + + /*-------------------------------------------- + Report missed messages. + --------------------------------------------*/ + char const * verb = ( missed_messages.size() == 1 ) + ? " was " + : " were "; + + char const * plural = ( missed_messages.size() == 1 ) + ? "." + : "s."; + + std::cerr << "Listener::shutdown: There" + << verb + << missed_messages.size() + << " missed message" + << plural + << endl; + + for ( i = 0; i < int(missed_messages.size()); ++ i ) { - multiple_messages.push_back ( i ); + std::cerr << " " << i << " was missed.\n"; } - } - } - - /*-------------------------------------------- - Report missed messages. - --------------------------------------------*/ - char const * verb = ( missed_messages.size() == 1 ) - ? " was " - : " were "; - - char const * plural = ( missed_messages.size() == 1 ) - ? "." - : "s."; - - std::cerr << "Listener::shutdown: There" - << verb - << missed_messages.size() - << " missed message" - << plural - << endl; - - for ( i = 0; i < int(missed_messages.size()); ++ i ) - { - std::cerr << " " << i << " was missed.\n"; - } - /*-------------------------------------------- - Report multiple messages. - --------------------------------------------*/ - verb = ( multiple_messages.size() == 1 ) - ? " was " - : " were "; + /*-------------------------------------------- + Report multiple messages. + --------------------------------------------*/ + verb = ( multiple_messages.size() == 1 ) + ? " was " + : " were "; - plural = ( multiple_messages.size() == 1 ) - ? "." - : "s."; + plural = ( multiple_messages.size() == 1 ) + ? "." + : "s."; - std::cerr << "Listener::shutdown: There" - << verb - << multiple_messages.size() - << " multiple message" - << plural - << endl; + std::cerr << "Listener::shutdown: There" + << verb + << multiple_messages.size() + << " multiple message" + << plural + << endl; - for ( i = 0; i < int(multiple_messages.size()); ++ i ) - { - std::cerr << " " - << multiple_messages[i] - << " was received " - << messages_received [ multiple_messages[i] ] - << " times.\n"; - } + for ( i = 0; i < int(multiple_messages.size()); ++ i ) + { + std::cerr << " " + << multiple_messages[i] + << " was received " + << messages_received [ multiple_messages[i] ] + << " times.\n"; + } - /* - for ( i = 0; i < last_received_message; ++ i ) - { - std::cerr << "Message " << i << ": " << messages_received[i] << std::endl; + /* + for ( i = 0; i < last_received_message; ++ i ) + { + std::cerr << "Message " << i << ": " << messages_received[i] << std::endl; + } + */ } - */ - } }; @@ -141,18 +141,18 @@ struct Recorder struct Listener : public MessageListener { - FailoverSubscriptionManager & subscriptionManager; - Recorder & recorder; + FailoverSubscriptionManager & subscriptionManager; + Recorder & recorder; - Listener ( FailoverSubscriptionManager& subs, - Recorder & recorder - ); + Listener ( FailoverSubscriptionManager& subs, + Recorder & recorder + ); - void shutdown() { recorder.report(); } - void parse_message ( std::string const & msg ); + void shutdown() { recorder.report(); } + void parse_message ( std::string const & msg ); - virtual void received ( Message & message ); + virtual void received ( Message & message ); }; @@ -160,8 +160,8 @@ struct Listener : public MessageListener Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) : - subscriptionManager(s), - recorder(r) + subscriptionManager(s), + recorder(r) { } @@ -172,19 +172,19 @@ Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) : void Listener::received ( Message & message ) { - std::cerr << "Listener received: " << message.getData() << std::endl; - if (message.getData() == "That's all, folks!") - { - std::cout << "Shutting down listener for " << message.getDestination() - << std::endl; - subscriptionManager.cancel(message.getDestination()); - - shutdown(); - } - else - { - parse_message ( message.getData() ); - } + std::cerr << "Listener received: " << message.getData() << std::endl; + if (message.getData() == "That's all, folks!") + { + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + subscriptionManager.cancel(message.getDestination()); + + shutdown(); + } + else + { + parse_message ( message.getData() ); + } } @@ -194,13 +194,13 @@ Listener::received ( Message & message ) void Listener::parse_message ( const std::string & msg ) { - int msg_number; - if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) ) - { - std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n"; - return; - } - recorder.received ( msg_number ); + int msg_number; + if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) ) + { + std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n"; + return; + } + recorder.received ( msg_number ); } @@ -211,37 +211,42 @@ Listener::parse_message ( const std::string & msg ) int main ( int argc, char ** argv ) { - string program_name = "LISTENER"; + try { + string program_name = "LISTENER"; - if ( argc < 3 ) - { - std::cerr << "Usage: ./listener host cluster_port_file_name\n"; - std::cerr << "i.e. for host: 127.0.0.1\n"; - exit(1); - } + if ( argc < 3 ) + { + std::cerr << "Usage: ./listener host cluster_port_file_name\n"; + std::cerr << "i.e. for host: 127.0.0.1\n"; + exit(1); + } - char const * host = argv[1]; - int port = atoi(argv[2]); + char const * host = argv[1]; + int port = atoi(argv[2]); - FailoverConnection connection; - FailoverSession * session; - Recorder recorder; + FailoverConnection connection; + FailoverSession * session; + Recorder recorder; - connection.name = program_name; + connection.name = program_name; - connection.open ( host, port ); - session = connection.newSession(); - session->name = program_name; + connection.open ( host, port ); + session = connection.newSession(); + session->name = program_name; - FailoverSubscriptionManager subscriptions ( session ); - subscriptions.name = program_name; - Listener listener ( subscriptions, recorder ); - subscriptions.subscribe ( listener, "message_queue" ); - subscriptions.run ( ); + FailoverSubscriptionManager subscriptions ( session ); + subscriptions.name = program_name; + Listener listener ( subscriptions, recorder ); + subscriptions.subscribe ( listener, "message_queue" ); + subscriptions.run ( ); - connection.close(); + connection.close(); + return 0; - return 1; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; } diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp index 3ee70c222a..e393d135f6 100644 --- a/qpid/cpp/src/qpid/client/Connection.cpp +++ b/qpid/cpp/src/qpid/client/Connection.cpp @@ -139,7 +139,7 @@ void Connection::close() { } std::vector<Url> Connection::getKnownBrokers() { - return isOpen() ? impl->getKnownBrokers() : std::vector<Url>(); + return impl ? impl->getKnownBrokers() : std::vector<Url>(); } }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index 93eea50e43..5d4723b442 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -151,11 +151,8 @@ static const std::string CONN_CLOSED("Connection closed by broker"); void ConnectionImpl::shutdown() { Mutex::ScopedLock l(lock); - if (handler.isClosed()) - { - std::cerr << "MDEBUG ConnectionImpl::shutdown -- returning w/o failure callback!\n"; - return; - } + if (handler.isClosed()) return; + // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have // an appropriate close-code. connection-forced is not right. if (!handler.isClosing()) diff --git a/qpid/cpp/src/qpid/client/FailoverListener.cpp b/qpid/cpp/src/qpid/client/FailoverListener.cpp index 5ca617138c..591bea91e8 100644 --- a/qpid/cpp/src/qpid/client/FailoverListener.cpp +++ b/qpid/cpp/src/qpid/client/FailoverListener.cpp @@ -64,7 +64,10 @@ void FailoverListener::stop() { subscriptions.reset(); } FailoverListener::~FailoverListener() { - stop(); + try { stop(); } + catch (const std::exception& e) { + QPID_LOG(warning, QPID_MSG("Ignoring exception in destructor" << e.what())); + } } void FailoverListener::received(Message& msg) { |