From f0a31beb7a609591e7b34e60ddfd85e9e183fbc0 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 24 Jan 2008 22:26:12 +0000 Subject: Improved/additional client API tests. - Replaced InProcessBroker with a more accurate loopback BrokerFixture. - Added asserts for mutex/condition/thread errors in debug build. - Added client tests for several exception conditions. - Added peer address to log ouput, client/server distinguished by (addr) or [addr] - Fixed various deadlocks & races exposed by the new asserts & tests. File-by-file: New BrokerFixture replaces InProcessBroker D src/tests/InProcessBroker.h M src/tests/BrokerFixture.h M src/tests/SocketProxy.h M src/tests/Makefile.am Made it run a bit faster. M src/tests/quick_perftest Redundant D src/tests/APRBaseTest.cpp Updated tests to use BrokerFixture M src/tests/ClientChannelTest.cpp M src/tests/exception_test.cpp M src/tests/ClientSessionTest.cpp Print thread IDs in decimal, same as GDB. M src/qpid/log/Logger.cpp Assert mutex/condition ops in debug build. M src/qpid/sys/posix/check.h M src/qpid/sys/posix/Mutex.h M src/qpid/sys/posix/Condition.h M src/qpid/sys/posix/Thread.h Added toFd() so SocketProxy can use ::select() M src/qpid/sys/Socket.h M src/qpid/sys/posix/Socket.cpp Fixes for races & deadlocks shown up by new tests & asserts. Mostly shutdown/close issues. M src/qpid/client/ConnectionHandler.h M src/qpid/client/ConnectionImpl.cpp M src/qpid/client/Demux.h M src/qpid/client/SessionCore.cpp M src/qpid/client/ConnectionHandler.cpp M src/qpid/client/Connector.h M src/qpid/client/Demux.cpp M src/qpid/client/Dispatcher.cpp M src/qpid/client/ConnectionImpl.h Logging peer address. M src/qpid/sys/AsynchIOAcceptor.cpp git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@615063 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/ConnectionImpl.cpp | 84 ++++++++++++++++++---------------- 1 file changed, 45 insertions(+), 39 deletions(-) (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp') diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index dd986deec4..b248de8744 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/log/Statement.h" #include "qpid/framing/constants.h" #include "qpid/framing/reply_exceptions.h" @@ -44,14 +45,18 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr c) connector->setShutdownHandler(this); } -ConnectionImpl::~ConnectionImpl() { close(); } +ConnectionImpl::~ConnectionImpl() { + // Important to close the connector first, to ensure the + // connector thread does not call on us while the destructor + // is running. + connector->close(); +} void ConnectionImpl::addSession(const boost::shared_ptr& session) { Mutex::ScopedLock l(lock); boost::weak_ptr& s = sessions[session->getChannel()]; - if (s.lock()) - throw ChannelBusyException(); + if (s.lock()) throw ChannelBusyException(); s = session; } @@ -81,31 +86,15 @@ void ConnectionImpl::open(const std::string& host, int port, handler.pwd = pwd; handler.vhost = vhost; + QPID_LOG(info, "Connecting to " << host << ":" << port); connector->connect(host, port); connector->init(); handler.waitForOpen(); } -bool ConnectionImpl::setClosing() -{ - Mutex::ScopedLock l(lock); - if (isClosing || isClosed) { - return false; - } - isClosing = true; - return true; -} - -void ConnectionImpl::close() -{ - if (setClosing()) { - handler.close(); - } -} - void ConnectionImpl::idleIn() { - connector->close(); + close(); } void ConnectionImpl::idleOut() @@ -114,35 +103,52 @@ void ConnectionImpl::idleOut() connector->send(frame); } -template -void ConnectionImpl::forChannels(F functor) { - for (SessionMap::iterator i = sessions.begin(); - i != sessions.end(); ++i) { - try { - boost::shared_ptr s = i->second.lock(); - if (s) functor(*s); - } catch (...) { assert(0); } +void ConnectionImpl::close() +{ + Mutex::ScopedLock l(lock); + if (isClosing || isClosed) return; + isClosing = true; + { + Mutex::ScopedUnlock u(lock); + handler.close(); + } + closed(REPLY_SUCCESS, "Closed by client"); +} + +// Set closed flags and erase the sessions map, but keep the contents +// so sessions can be updated outside the lock. +ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) { + isClosed = true; + connector->close(); + SessionVector save; + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + boost::shared_ptr s = i->second.lock(); + if (s) save.push_back(s); } + sessions.clear(); + return save; } -void ConnectionImpl::shutdown() +void ConnectionImpl::closed(uint16_t code, const std::string& text) { Mutex::ScopedLock l(lock); if (isClosed) return; - forChannels(boost::bind(&SessionCore::connectionBroke, _1, - INTERNAL_ERROR, "Unexpected socket closure.")); - sessions.clear(); - isClosed = true; + SessionVector save(closeInternal(l)); + Mutex::ScopedUnlock u(lock); + std::for_each(save.begin(), save.end(), boost::bind(&SessionCore::connectionClosed, _1, code, text)); } -void ConnectionImpl::closed(uint16_t code, const std::string& text) +static const std::string CONN_CLOSED("Connection closed by broker"); + +void ConnectionImpl::shutdown() { Mutex::ScopedLock l(lock); if (isClosed) return; - forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text)); - sessions.clear(); - isClosed = true; - connector->close(); + SessionVector save(closeInternal(l)); + handler.fail(CONN_CLOSED); + Mutex::ScopedUnlock u(lock); + std::for_each(save.begin(), save.end(), + boost::bind(&SessionCore::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { -- cgit v1.2.1