diff options
author | Alan Conway <aconway@apache.org> | 2013-12-20 15:06:35 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-12-20 15:06:35 +0000 |
commit | 829b44f73e0825f838099af4b683b1f744c0e2aa (patch) | |
tree | 50123bc073572ab16794cb5213fba18ab2b3c86c /qpid/cpp | |
parent | f87fbe5b1f0441c1066be6db0836097eda48b02c (diff) | |
download | qpid-python-829b44f73e0825f838099af4b683b1f744c0e2aa.tar.gz |
QPID-5431: Qpid c++ client hangs / crashes during reception failover in HA environment (mutual recursion)
Bug in AMQP 1.0 retry code caused an infinite recursion when failing over.
The recursion was in messaging::amqp::ConnectionContext, where the following
recursive cycle could occur:
check()->autoconnect()->tryConnect(Url)->tryConnect(Address)->wait()->check()->...
Re-organized the code to avoid the recursion, specifically avoid calling check()
in tryConnect(Address). A disconnect detected in tryConnect results in continuing
the retry rather than calling autoconnect again.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1552698 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/examples/messaging/drain.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 74 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h | 2 |
4 files changed, 53 insertions, 32 deletions
diff --git a/qpid/cpp/examples/messaging/drain.cpp b/qpid/cpp/examples/messaging/drain.cpp index aa2d7b4964..bfd9bdccbe 100644 --- a/qpid/cpp/examples/messaging/drain.cpp +++ b/qpid/cpp/examples/messaging/drain.cpp @@ -82,8 +82,9 @@ int main(int argc, char** argv) { Options options; if (options.parse(argc, argv) && options.checkAddress()) { - Connection connection(options.url, options.connectionOptions); + Connection connection; try { + connection = Connection(options.url, options.connectionOptions); connection.open(); Session session = connection.createSession(); Receiver receiver = session.createReceiver(options.address); @@ -103,7 +104,7 @@ int main(int argc, char** argv) connection.close(); return 0; } catch(const std::exception& error) { - std::cout << error.what() << std::endl; + std::cout << "Error: " << error.what() << std::endl; connection.close(); } } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 1587b5b33f..33b2d65ba6 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -438,7 +438,7 @@ void BrokerReplicator::route(Deliverable& msg) { if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); - QPID_LOG(debug, "Broker replicator event: " << map); + QPID_LOG(trace, "Broker replicator event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); std::string key = (schema[PACKAGE_NAME].asString() + @@ -450,7 +450,7 @@ void BrokerReplicator::route(Deliverable& msg) { } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); - QPID_LOG(debug, "Broker replicator response: " << map); + QPID_LOG(trace, "Broker replicator response: " << map); string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString(); Variant::Map& values = map[VALUES].asMap(); framing::FieldTable args; diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 47f65328eb..c9cdd075bc 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -47,6 +47,18 @@ namespace qpid { namespace messaging { namespace amqp { namespace { + +std::string asString(const std::vector<std::string>& v) { + std::stringstream os; + os << "["; + for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) { + if (i != v.begin()) os << ", "; + os << '"' << *i << '"'; + } + os << "]"; + return os.str(); +} + //remove conditional when 0.5 is no longer supported #ifdef HAVE_PROTON_TRACER void do_trace(pn_transport_t* transport, const char* message) @@ -437,27 +449,33 @@ void ConnectionContext::reset() pn_transport_bind(engine, connection); } -void ConnectionContext::check() -{ - if (state == DISCONNECTED) { +void ConnectionContext::check() { + if (checkDisconnected()) { if (ConnectionOptions::reconnect) { - reset(); autoconnect(); } else { throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)"); } } - if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { - pn_condition_t* error = pn_connection_remote_condition(connection); - std::stringstream text; - if (pn_condition_is_set(error)) { - text << "Connection closed by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error); - } else { - text << "Connection closed by peer"; +} + +bool ConnectionContext::checkDisconnected() { + if (state == DISCONNECTED) { + reset(); + } else { + if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + pn_condition_t* error = pn_connection_remote_condition(connection); + std::stringstream text; + if (pn_condition_is_set(error)) { + text << "Connection closed by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error); + } else { + text << "Connection closed by peer"; + } + pn_connection_close(connection); + throw qpid::messaging::ConnectionError(text.str()); } - pn_connection_close(connection); - throw qpid::messaging::ConnectionError(text.str()); } + return state == DISCONNECTED; } void ConnectionContext::wait() @@ -843,16 +861,6 @@ void ConnectionContext::open() namespace { -std::string asString(const std::vector<std::string>& v) { - std::stringstream os; - os << "["; - for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) { - if (i != v.begin()) os << ", "; - os << *i; - } - os << "]"; - return os.str(); -} double FOREVER(std::numeric_limits<double>::max()); bool expired(const sys::AbsTime& start, double timeout) { @@ -894,6 +902,7 @@ bool ConnectionContext::tryConnect() if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) { return true; } + QPID_LOG(info, "Failed to connect to " << *i); } catch (const qpid::messaging::TransportFailure& e) { QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); } @@ -923,6 +932,13 @@ void ConnectionContext::reconnect() } } +void ConnectionContext::waitNoReconnect() { + if (!checkDisconnected()) { + lock.wait(); + checkDisconnected(); + } +} + bool ConnectionContext::tryConnect(const Url& url) { if (url.getUser().size()) username = url.getUser(); @@ -934,10 +950,11 @@ bool ConnectionContext::tryConnect(const Url& url) setCurrentUrl(*i); if (sasl.get()) { wakeupDriver(); - while (!sasl->authenticated()) { + while (!sasl->authenticated() && state != DISCONNECTED) { QPID_LOG(debug, id << " Waiting to be authenticated..."); - wait(); + waitNoReconnect(); } + if (state == DISCONNECTED) continue; QPID_LOG(debug, id << " Authenticated"); } @@ -945,9 +962,10 @@ bool ConnectionContext::tryConnect(const Url& url) setProperties(); pn_connection_open(connection); wakeupDriver(); //want to write - while (pn_connection_state(connection) & PN_REMOTE_UNINIT) { - wait(); - } + while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) && + state != DISCONNECTED) + waitNoReconnect(); + if (state == DISCONNECTED) continue; if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { throw qpid::messaging::ConnectionError("Failed to open connection"); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index e1e6957707..94cdefccfc 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -150,6 +150,8 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag CodecAdapter codecAdapter; void check(); + bool checkDisconnected(); + void waitNoReconnect(); void wait(); void waitUntil(qpid::sys::AbsTime until); void wait(boost::shared_ptr<SessionContext>); |