summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-12-20 15:06:35 +0000
committerAlan Conway <aconway@apache.org>2013-12-20 15:06:35 +0000
commit829b44f73e0825f838099af4b683b1f744c0e2aa (patch)
tree50123bc073572ab16794cb5213fba18ab2b3c86c /qpid/cpp
parentf87fbe5b1f0441c1066be6db0836097eda48b02c (diff)
downloadqpid-python-829b44f73e0825f838099af4b683b1f744c0e2aa.tar.gz
QPID-5431: Qpid c++ client hangs / crashes during reception failover in HA environment (mutual recursion)
Bug in AMQP 1.0 retry code caused an infinite recursion when failing over. The recursion was in messaging::amqp::ConnectionContext, where the following recursive cycle could occur: check()->autoconnect()->tryConnect(Url)->tryConnect(Address)->wait()->check()->... Re-organized the code to avoid the recursion, specifically avoid calling check() in tryConnect(Address). A disconnect detected in tryConnect results in continuing the retry rather than calling autoconnect again. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1552698 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/examples/messaging/drain.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp74
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h2
4 files changed, 53 insertions, 32 deletions
diff --git a/qpid/cpp/examples/messaging/drain.cpp b/qpid/cpp/examples/messaging/drain.cpp
index aa2d7b4964..bfd9bdccbe 100644
--- a/qpid/cpp/examples/messaging/drain.cpp
+++ b/qpid/cpp/examples/messaging/drain.cpp
@@ -82,8 +82,9 @@ int main(int argc, char** argv)
{
Options options;
if (options.parse(argc, argv) && options.checkAddress()) {
- Connection connection(options.url, options.connectionOptions);
+ Connection connection;
try {
+ connection = Connection(options.url, options.connectionOptions);
connection.open();
Session session = connection.createSession();
Receiver receiver = session.createReceiver(options.address);
@@ -103,7 +104,7 @@ int main(int argc, char** argv)
connection.close();
return 0;
} catch(const std::exception& error) {
- std::cout << error.what() << std::endl;
+ std::cout << "Error: " << error.what() << std::endl;
connection.close();
}
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 1587b5b33f..33b2d65ba6 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -438,7 +438,7 @@ void BrokerReplicator::route(Deliverable& msg) {
if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
- QPID_LOG(debug, "Broker replicator event: " << map);
+ QPID_LOG(trace, "Broker replicator event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
std::string key = (schema[PACKAGE_NAME].asString() +
@@ -450,7 +450,7 @@ void BrokerReplicator::route(Deliverable& msg) {
} else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
- QPID_LOG(debug, "Broker replicator response: " << map);
+ QPID_LOG(trace, "Broker replicator response: " << map);
string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
Variant::Map& values = map[VALUES].asMap();
framing::FieldTable args;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 47f65328eb..c9cdd075bc 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -47,6 +47,18 @@ namespace qpid {
namespace messaging {
namespace amqp {
namespace {
+
+std::string asString(const std::vector<std::string>& v) {
+ std::stringstream os;
+ os << "[";
+ for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
+ if (i != v.begin()) os << ", ";
+ os << '"' << *i << '"';
+ }
+ os << "]";
+ return os.str();
+}
+
//remove conditional when 0.5 is no longer supported
#ifdef HAVE_PROTON_TRACER
void do_trace(pn_transport_t* transport, const char* message)
@@ -437,27 +449,33 @@ void ConnectionContext::reset()
pn_transport_bind(engine, connection);
}
-void ConnectionContext::check()
-{
- if (state == DISCONNECTED) {
+void ConnectionContext::check() {
+ if (checkDisconnected()) {
if (ConnectionOptions::reconnect) {
- reset();
autoconnect();
} else {
throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)");
}
}
- if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
- pn_condition_t* error = pn_connection_remote_condition(connection);
- std::stringstream text;
- if (pn_condition_is_set(error)) {
- text << "Connection closed by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
- } else {
- text << "Connection closed by peer";
+}
+
+bool ConnectionContext::checkDisconnected() {
+ if (state == DISCONNECTED) {
+ reset();
+ } else {
+ if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_condition_t* error = pn_connection_remote_condition(connection);
+ std::stringstream text;
+ if (pn_condition_is_set(error)) {
+ text << "Connection closed by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
+ } else {
+ text << "Connection closed by peer";
+ }
+ pn_connection_close(connection);
+ throw qpid::messaging::ConnectionError(text.str());
}
- pn_connection_close(connection);
- throw qpid::messaging::ConnectionError(text.str());
}
+ return state == DISCONNECTED;
}
void ConnectionContext::wait()
@@ -843,16 +861,6 @@ void ConnectionContext::open()
namespace {
-std::string asString(const std::vector<std::string>& v) {
- std::stringstream os;
- os << "[";
- for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
- if (i != v.begin()) os << ", ";
- os << *i;
- }
- os << "]";
- return os.str();
-}
double FOREVER(std::numeric_limits<double>::max());
bool expired(const sys::AbsTime& start, double timeout)
{
@@ -894,6 +902,7 @@ bool ConnectionContext::tryConnect()
if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) {
return true;
}
+ QPID_LOG(info, "Failed to connect to " << *i);
} catch (const qpid::messaging::TransportFailure& e) {
QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
}
@@ -923,6 +932,13 @@ void ConnectionContext::reconnect()
}
}
+void ConnectionContext::waitNoReconnect() {
+ if (!checkDisconnected()) {
+ lock.wait();
+ checkDisconnected();
+ }
+}
+
bool ConnectionContext::tryConnect(const Url& url)
{
if (url.getUser().size()) username = url.getUser();
@@ -934,10 +950,11 @@ bool ConnectionContext::tryConnect(const Url& url)
setCurrentUrl(*i);
if (sasl.get()) {
wakeupDriver();
- while (!sasl->authenticated()) {
+ while (!sasl->authenticated() && state != DISCONNECTED) {
QPID_LOG(debug, id << " Waiting to be authenticated...");
- wait();
+ waitNoReconnect();
}
+ if (state == DISCONNECTED) continue;
QPID_LOG(debug, id << " Authenticated");
}
@@ -945,9 +962,10 @@ bool ConnectionContext::tryConnect(const Url& url)
setProperties();
pn_connection_open(connection);
wakeupDriver(); //want to write
- while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
- wait();
- }
+ while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) &&
+ state != DISCONNECTED)
+ waitNoReconnect();
+ if (state == DISCONNECTED) continue;
if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
throw qpid::messaging::ConnectionError("Failed to open connection");
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index e1e6957707..94cdefccfc 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -150,6 +150,8 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
CodecAdapter codecAdapter;
void check();
+ bool checkDisconnected();
+ void waitNoReconnect();
void wait();
void waitUntil(qpid::sys::AbsTime until);
void wait(boost::shared_ptr<SessionContext>);