summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-14 19:35:33 +0000
committerAlan Conway <aconway@apache.org>2008-10-14 19:35:33 +0000
commit4b8296548e36c271f90f9cc249220a9145ca9eaa (patch)
tree362a6c2b8f5262022e2be3da04280a0bfe569c66 /qpid
parent11db2ff9108e93a3146aa4608c3c355363bd5ad9 (diff)
downloadqpid-python-4b8296548e36c271f90f9cc249220a9145ca9eaa.tar.gz
Minor cleanup for client failover.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@704637 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/examples/failover/direct_producer.cpp6
-rw-r--r--qpid/cpp/examples/failover/listener.cpp5
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/Dispatcher.cpp14
-rw-r--r--qpid/cpp/src/qpid/client/FailoverListener.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp15
6 files changed, 19 insertions, 27 deletions
diff --git a/qpid/cpp/examples/failover/direct_producer.cpp b/qpid/cpp/examples/failover/direct_producer.cpp
index ef2231a7f0..1bee56e164 100644
--- a/qpid/cpp/examples/failover/direct_producer.cpp
+++ b/qpid/cpp/examples/failover/direct_producer.cpp
@@ -40,14 +40,13 @@ main ( int argc, char ** argv)
int port = argc>2 ? atoi(argv[2]) : 5672;
int count = argc>3 ? atoi(argv[3]) : 30;
int delayMs = argc>4 ? atoi(argv[4]) : 1000;
+ string program_name = "PRODUCER";
try {
FailoverConnection connection;
FailoverSession * session;
Message message;
- string program_name = "PRODUCER";
-
connection.open ( host, port );
session = connection.newSession();
int sent = 0;
@@ -89,9 +88,10 @@ main ( int argc, char ** argv)
session->sync();
connection.close();
+ std::cout << program_name << ": " << " completed without error." << std::endl;
return 0;
} catch(const std::exception& error) {
- std::cout << error.what() << std::endl;
+ std::cout << program_name << ": " << error.what() << std::endl;
}
return 1;
}
diff --git a/qpid/cpp/examples/failover/listener.cpp b/qpid/cpp/examples/failover/listener.cpp
index c4c7d096b3..1c47127389 100644
--- a/qpid/cpp/examples/failover/listener.cpp
+++ b/qpid/cpp/examples/failover/listener.cpp
@@ -229,9 +229,9 @@ main ( int argc, char ** argv )
{
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
+ string program_name = "LISTENER";
try {
- string program_name = "LISTENER";
FailoverConnection connection;
FailoverSession * session;
@@ -250,10 +250,11 @@ main ( int argc, char ** argv )
subscriptions.run ( );
connection.close();
+ std::cout << program_name << ": " << " completed without error." << std::endl;
return 0;
} catch(const std::exception& error) {
- std::cout << error.what() << std::endl;
+ std::cout << program_name << ": " << error.what() << std::endl;
}
return 1;
}
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
index 5fbe87878a..34bf8708cb 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -166,7 +166,7 @@ void ConnectionHandler::openOk ( const framing::Array& knownBrokers )
for ( i = knownBrokers.begin(); i != knownBrokers.end(); ++i )
knownBrokersUrls.push_back(Url((*i)->get<std::string>()));
setState(OPEN);
- QPID_LOG(info, "Known-brokers for connection: " << log::formatList(knownBrokersUrls));
+ QPID_LOG(debug, "Known-brokers for connection: " << log::formatList(knownBrokersUrls));
}
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp
index 08905bc96c..fd9d8a8ad1 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.cpp
+++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp
@@ -38,7 +38,7 @@ namespace qpid {
namespace client {
Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a)
- : session(s), listener(l), autoAck(a) {}
+ : session(s), listener(l), autoAck(a) {}
void Subscriber::received(Message& msg)
{
@@ -96,18 +96,12 @@ void Dispatcher::run()
}
catch (const ClosedException& e)
{
- QPID_LOG(debug, "Ignored exception in client dispatch thread: " << e.what());
+ QPID_LOG(debug, "Ignored exception in client dispatch thread: " << e.what());
} //ignore it and return
catch (const std::exception& e) {
QPID_LOG(error, "Exception in client dispatch thread: " << e.what());
- if ( failoverHandler )
- {
- failoverHandler();
- }
- else
- {
- QPID_LOG(info, "No dispatcher failover handler registered.");
- }
+ if ( failoverHandler )
+ failoverHandler();
}
}
diff --git a/qpid/cpp/src/qpid/client/FailoverListener.cpp b/qpid/cpp/src/qpid/client/FailoverListener.cpp
index 591bea91e8..e13f240439 100644
--- a/qpid/cpp/src/qpid/client/FailoverListener.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverListener.cpp
@@ -65,9 +65,7 @@ void FailoverListener::stop() {
}
FailoverListener::~FailoverListener() {
try { stop(); }
- catch (const std::exception& e) {
- QPID_LOG(warning, QPID_MSG("Ignoring exception in destructor" << e.what()));
- }
+ catch (const std::exception& e) {}
}
void FailoverListener::received(Message& msg) {
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index edf23683ae..811c1c9557 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -36,6 +36,7 @@
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
+#include "qpid/log/Helpers.h"
#include "qpid/sys/Thread.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
@@ -163,10 +164,8 @@ void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& con
void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); }
void Cluster::mcast(const Event& e, Lock&) {
- if (state == LEFT) {
- lock.notifyAll(); // threads waiting in getUrls()
+ if (state == LEFT)
return;
- }
if (state < READY && e.isConnection()) {
// Stall outgoing connection events.
QPID_LOG(trace, *this << " MCAST deferred: " << e );
@@ -354,7 +353,6 @@ void Cluster::configChange (
map = ClusterMap(memberId, myUrl, true);
memberUpdate(l);
unstall(l);
- lock.notifyAll(); // threads waiting in getUrls()
}
else { // Joining established group.
state = NEWBIE;
@@ -383,7 +381,7 @@ void Cluster::tryMakeOffer(const MemberId& id, Lock& l) {
void Cluster::unstall(Lock& l) {
// Called with lock held
switch (state) {
- case INIT: case DUMPEE: case DUMPER:
+ case INIT: case DUMPEE: case DUMPER: case READY:
QPID_LOG(debug, *this << " unstall: deliver=" << deliverQueue.size()
<< " mcast=" << mcastQueue.size());
deliverQueue.start();
@@ -393,7 +391,7 @@ void Cluster::unstall(Lock& l) {
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
break;
case LEFT: break;
- case NEWBIE: case READY: case OFFER:
+ case NEWBIE: case OFFER:
assert(0);
}
}
@@ -422,7 +420,7 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) {
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
map.ready(id, Url(url));
if (id == memberId)
- lock.notifyAll(); // threads waiting in getUrls()
+ unstall(l);
memberUpdate(l);
}
@@ -474,7 +472,8 @@ void Cluster::checkDumpIn(Lock& l) {
map = *dumpedMap;
QPID_LOG(debug, *this << " incoming dump complete. Members: " << map);
mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l);
- unstall(l);
+ state = READY;
+ // unstall when ready control is self-delivered.
}
}