diff options
author | Gordon Sim <gsim@apache.org> | 2008-11-03 17:21:38 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-11-03 17:21:38 +0000 |
commit | a95eb74d7f806e3a60bbd61a042329bcfba9b21d (patch) | |
tree | 1f300ecd3d7dd2a39fdcd3882fb788b5dce28bf2 /cpp/src | |
parent | c7ed94d2a123f3753a6d64eff5a83b742ce30163 (diff) | |
download | qpid-python-a95eb74d7f806e3a60bbd61a042329bcfba9b21d.tar.gz |
Various fixes arising from testing client failover:
* introduced new exception type for signalling connection failure (as distinct from any logical connection errors)
* ConnectionImpl::closeInternal(): take copy of session map to prevent concurrent modification (by the same thread) as sessions are deleted and erase themselves.
* ConnectionImpl::shutdown: hold lock before calling closeInternal(); mark handler failed before informing sessions of failure
* SessionImpl::connectionBroker(): remove code as its rather meaningless
* Don't swallow exceptions in Dispatcher
* Handle exceptions in FailoverListener
* Take weak_ptr to ConnectionImpl on constructor of Connector, then convert to shared_ptr when 'receiver' thread is started.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@710106 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/Exception.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverListener.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverListener.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/exception_test.cpp | 9 |
10 files changed, 52 insertions, 24 deletions
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h index 9cf564104d..57e7c682eb 100644 --- a/cpp/src/qpid/Exception.h +++ b/cpp/src/qpid/Exception.h @@ -80,6 +80,13 @@ struct ClosedException : public Exception { std::string getPrefix() const; }; +/** + * Exception representing transport failure + */ +struct TransportFailure : public Exception { + TransportFailure(const std::string& msg=std::string()) : Exception(msg) {} +}; + } // namespace qpid #endif /*!_Exception_*/ diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index c957ac0fc5..efff4027aa 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -89,7 +89,7 @@ void ConnectionHandler::outgoing(AMQFrame& frame) if (getState() == OPEN) out(frame); else - throw Exception(errorText.empty() ? "Connection is not open." : errorText); + throw TransportFailure(errorText.empty() ? "Connection is not open." : errorText); } void ConnectionHandler::waitForOpen() diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index ca88de62dd..a3be69e017 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -129,17 +129,22 @@ void ConnectionImpl::close() { if (!handler.isOpen()) return; handler.close(); - closed(CLOSE_CODE_NORMAL, "Closed by client"); } template <class F> void ConnectionImpl::closeInternal(const F& f) { connector->close(); - for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + //notifying sessions of failure can result in those session being + //deleted which in turn results in a call to erase(); this can + //even happen on this thread, when 's' goes out of scope + //below. Using a copy prevents the map being modified as we + //iterate through. + SessionMap copy; + sessions.swap(copy); + for (SessionMap::iterator i = copy.begin(); i != copy.end(); ++i) { boost::shared_ptr<SessionImpl> s = i->second.lock(); if (s) f(s); } - sessions.clear(); } void ConnectionImpl::closed(uint16_t code, const std::string& text) { @@ -148,7 +153,7 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) { closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text)); } -static const std::string CONN_CLOSED("Connection closed by broker"); +static const std::string CONN_CLOSED("Connection closed"); void ConnectionImpl::shutdown() { if ( failureCallback ) @@ -158,10 +163,12 @@ void ConnectionImpl::shutdown() { // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have // an appropriate close-code. connection-forced is not right. - if (!handler.isClosing()) - closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); - setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); - handler.fail(CONN_CLOSED); + bool isClosing = handler.isClosing(); + handler.fail(CONN_CLOSED);//ensure connection is marked as failed before notifying sessions + Mutex::ScopedLock l(lock); + if (!isClosing) + closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONN_CLOSED)); + setException(new TransportFailure(CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index f7a8d8b853..6509964fe8 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -35,6 +35,7 @@ #include <map> #include <boost/bind.hpp> #include <boost/format.hpp> +#include <boost/weak_ptr.hpp> namespace qpid { namespace client { @@ -140,7 +141,7 @@ class TCPConnector : public Connector, private sys::Runnable std::string identifier; - ConnectionImpl* impl; + boost::weak_ptr<ConnectionImpl> impl; void connect(const std::string& host, int port); void init(); @@ -183,7 +184,7 @@ TCPConnector::TCPConnector(ProtocolVersion ver, shutdownHandler(0), writer(maxFrameSize, cimpl), aio(0), - impl(cimpl) + impl(cimpl->shared_from_this()) { QPID_LOG(debug, "TCPConnector created for " << version.toString()); settings.configureSocket(socket); @@ -380,7 +381,7 @@ void TCPConnector::eof(AsynchIO&) { // will never be called void TCPConnector::run(){ // Keep the connection impl in memory until run() completes. - boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); + boost::shared_ptr<ConnectionImpl> protect = impl.lock(); assert(protect); try { Dispatcher d(poller); diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 0b7618eb4c..da6607fb9e 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -91,9 +91,9 @@ void Dispatcher::run() if ( failoverHandler ) { QPID_LOG(debug, QPID_MSG(session.getId() << " failover: " << e.what())); failoverHandler(); - } - else { + } else { QPID_LOG(error, session.getId() << " error: " << e.what()); + throw; } } } diff --git a/cpp/src/qpid/client/FailoverListener.cpp b/cpp/src/qpid/client/FailoverListener.cpp index 772d9a4197..16370f8912 100644 --- a/cpp/src/qpid/client/FailoverListener.cpp +++ b/cpp/src/qpid/client/FailoverListener.cpp @@ -61,7 +61,17 @@ FailoverListener::FailoverListener(const boost::shared_ptr<ConnectionImpl>& c, c session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true); session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER); subscriptions->subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE)); - thread = sys::Thread(*subscriptions); + thread = sys::Thread(*this); +} + +void FailoverListener::run() +{ + try { + subscriptions->run(); + } catch (const TransportFailure&) { + } catch (const std::exception& e) { + QPID_LOG(error, QPID_MSG(e.what())); + } } FailoverListener::~FailoverListener() { diff --git a/cpp/src/qpid/client/FailoverListener.h b/cpp/src/qpid/client/FailoverListener.h index fc0cca28f1..fe73a26611 100644 --- a/cpp/src/qpid/client/FailoverListener.h +++ b/cpp/src/qpid/client/FailoverListener.h @@ -25,6 +25,7 @@ #include "qpid/client/MessageListener.h" #include "qpid/Url.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include <vector> @@ -36,7 +37,8 @@ class SubscriptionManager; /** * @internal Listen for failover updates from the amq.failover exchange. */ -class FailoverListener : public MessageListener { +class FailoverListener : public MessageListener, private qpid::sys::Runnable +{ public: FailoverListener(const boost::shared_ptr<ConnectionImpl>&, const std::vector<Url>& initUrls); ~FailoverListener(); @@ -44,6 +46,7 @@ class FailoverListener : public MessageListener { std::vector<Url> getKnownBrokers() const; void received(Message& msg); + void run(); private: mutable sys::Mutex lock; diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 08e405565a..0f86f7ff0a 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -260,8 +260,9 @@ void SessionImpl::connectionClosed(uint16_t code, const std::string& text) { * Called by ConnectionImpl to notify active sessions when connection * is disconnected */ -void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) { - connectionClosed(_code, _text); +void SessionImpl::connectionBroke(const std::string& _text) { + setException(sys::ExceptionHolder(new TransportFailure(_text))); + handleClosed(); } Future SessionImpl::send(const AMQBody& command) diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index d56566ec14..1414862792 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -99,7 +99,7 @@ public: //NOTE: these are called by the network thread when the connection is closed or dies void connectionClosed(uint16_t code, const std::string& text); - void connectionBroke(uint16_t code, const std::string& text); + void connectionBroke(const std::string& text); /** Set timeout in seconds, returns actual timeout allowed by broker */ uint32_t setTimeout(uint32_t requestedSeconds); diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index f3f5435699..e420bf2f0b 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -92,7 +92,7 @@ QPID_AUTO_TEST_CASE(DisconnectedPop) { ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT)); fix.session.queueDeclare(arg::queue="q"); fix.subs.subscribe(fix.lq, "q"); - Catcher<ConnectionException> pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC)); + Catcher<TransportFailure> pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC)); fix.connection.proxy.close(); BOOST_CHECK(pop.join()); } @@ -106,11 +106,10 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) { fix.session.queueDeclare(arg::queue="q"); fix.subs.subscribe(l, "q"); - ScopedSuppressLogging sl; // Suppress messages for expected errors. - Thread t(fix.subs); + Catcher<TransportFailure> runner(bind(&SubscriptionManager::run, boost::ref(fix.subs))); fix.connection.proxy.close(); - t.join(); - BOOST_CHECK_THROW(fix.session.close(), ConnectionException); + runner.join(); + BOOST_CHECK_THROW(fix.session.close(), TransportFailure); } QPID_AUTO_TEST_CASE(NoSuchQueueTest) { |