diff options
-rw-r--r-- | qpid/cpp/examples/failover/direct_producer.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/examples/failover/listener.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionHandler.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Dispatcher.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverListener.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 15 |
6 files changed, 19 insertions, 27 deletions
diff --git a/qpid/cpp/examples/failover/direct_producer.cpp b/qpid/cpp/examples/failover/direct_producer.cpp index ef2231a7f0..1bee56e164 100644 --- a/qpid/cpp/examples/failover/direct_producer.cpp +++ b/qpid/cpp/examples/failover/direct_producer.cpp @@ -40,14 +40,13 @@ main ( int argc, char ** argv) int port = argc>2 ? atoi(argv[2]) : 5672; int count = argc>3 ? atoi(argv[3]) : 30; int delayMs = argc>4 ? atoi(argv[4]) : 1000; + string program_name = "PRODUCER"; try { FailoverConnection connection; FailoverSession * session; Message message; - string program_name = "PRODUCER"; - connection.open ( host, port ); session = connection.newSession(); int sent = 0; @@ -89,9 +88,10 @@ main ( int argc, char ** argv) session->sync(); connection.close(); + std::cout << program_name << ": " << " completed without error." << std::endl; return 0; } catch(const std::exception& error) { - std::cout << error.what() << std::endl; + std::cout << program_name << ": " << error.what() << std::endl; } return 1; } diff --git a/qpid/cpp/examples/failover/listener.cpp b/qpid/cpp/examples/failover/listener.cpp index c4c7d096b3..1c47127389 100644 --- a/qpid/cpp/examples/failover/listener.cpp +++ b/qpid/cpp/examples/failover/listener.cpp @@ -229,9 +229,9 @@ main ( int argc, char ** argv ) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; + string program_name = "LISTENER"; try { - string program_name = "LISTENER"; FailoverConnection connection; FailoverSession * session; @@ -250,10 +250,11 @@ main ( int argc, char ** argv ) subscriptions.run ( ); connection.close(); + std::cout << program_name << ": " << " completed without error." << std::endl; return 0; } catch(const std::exception& error) { - std::cout << error.what() << std::endl; + std::cout << program_name << ": " << error.what() << std::endl; } return 1; } diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index 5fbe87878a..34bf8708cb 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -166,7 +166,7 @@ void ConnectionHandler::openOk ( const framing::Array& knownBrokers ) for ( i = knownBrokers.begin(); i != knownBrokers.end(); ++i ) knownBrokersUrls.push_back(Url((*i)->get<std::string>())); setState(OPEN); - QPID_LOG(info, "Known-brokers for connection: " << log::formatList(knownBrokersUrls)); + QPID_LOG(debug, "Known-brokers for connection: " << log::formatList(knownBrokersUrls)); } diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp index 08905bc96c..fd9d8a8ad1 100644 --- a/qpid/cpp/src/qpid/client/Dispatcher.cpp +++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp @@ -38,7 +38,7 @@ namespace qpid { namespace client { Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a) - : session(s), listener(l), autoAck(a) {} + : session(s), listener(l), autoAck(a) {} void Subscriber::received(Message& msg) { @@ -96,18 +96,12 @@ void Dispatcher::run() } catch (const ClosedException& e) { - QPID_LOG(debug, "Ignored exception in client dispatch thread: " << e.what()); + QPID_LOG(debug, "Ignored exception in client dispatch thread: " << e.what()); } //ignore it and return catch (const std::exception& e) { QPID_LOG(error, "Exception in client dispatch thread: " << e.what()); - if ( failoverHandler ) - { - failoverHandler(); - } - else - { - QPID_LOG(info, "No dispatcher failover handler registered."); - } + if ( failoverHandler ) + failoverHandler(); } } diff --git a/qpid/cpp/src/qpid/client/FailoverListener.cpp b/qpid/cpp/src/qpid/client/FailoverListener.cpp index 591bea91e8..e13f240439 100644 --- a/qpid/cpp/src/qpid/client/FailoverListener.cpp +++ b/qpid/cpp/src/qpid/client/FailoverListener.cpp @@ -65,9 +65,7 @@ void FailoverListener::stop() { } FailoverListener::~FailoverListener() { try { stop(); } - catch (const std::exception& e) { - QPID_LOG(warning, QPID_MSG("Ignoring exception in destructor" << e.what())); - } + catch (const std::exception& e) {} } void FailoverListener::received(Message& msg) { diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index edf23683ae..811c1c9557 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -36,6 +36,7 @@ #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" +#include "qpid/log/Helpers.h" #include "qpid/sys/Thread.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" @@ -163,10 +164,8 @@ void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& con void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); } void Cluster::mcast(const Event& e, Lock&) { - if (state == LEFT) { - lock.notifyAll(); // threads waiting in getUrls() + if (state == LEFT) return; - } if (state < READY && e.isConnection()) { // Stall outgoing connection events. QPID_LOG(trace, *this << " MCAST deferred: " << e ); @@ -354,7 +353,6 @@ void Cluster::configChange ( map = ClusterMap(memberId, myUrl, true); memberUpdate(l); unstall(l); - lock.notifyAll(); // threads waiting in getUrls() } else { // Joining established group. state = NEWBIE; @@ -383,7 +381,7 @@ void Cluster::tryMakeOffer(const MemberId& id, Lock& l) { void Cluster::unstall(Lock& l) { // Called with lock held switch (state) { - case INIT: case DUMPEE: case DUMPER: + case INIT: case DUMPEE: case DUMPER: case READY: QPID_LOG(debug, *this << " unstall: deliver=" << deliverQueue.size() << " mcast=" << mcastQueue.size()); deliverQueue.start(); @@ -393,7 +391,7 @@ void Cluster::unstall(Lock& l) { if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); break; case LEFT: break; - case NEWBIE: case READY: case OFFER: + case NEWBIE: case OFFER: assert(0); } } @@ -422,7 +420,7 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { map.ready(id, Url(url)); if (id == memberId) - lock.notifyAll(); // threads waiting in getUrls() + unstall(l); memberUpdate(l); } @@ -474,7 +472,8 @@ void Cluster::checkDumpIn(Lock& l) { map = *dumpedMap; QPID_LOG(debug, *this << " incoming dump complete. Members: " << map); mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l); - unstall(l); + state = READY; + // unstall when ready control is self-delivered. } } |