summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-11-03 17:21:38 +0000
committerGordon Sim <gsim@apache.org>2008-11-03 17:21:38 +0000
commita95eb74d7f806e3a60bbd61a042329bcfba9b21d (patch)
tree1f300ecd3d7dd2a39fdcd3882fb788b5dce28bf2 /cpp/src
parentc7ed94d2a123f3753a6d64eff5a83b742ce30163 (diff)
downloadqpid-python-a95eb74d7f806e3a60bbd61a042329bcfba9b21d.tar.gz
Various fixes arising from testing client failover:
* introduced new exception type for signalling connection failure (as distinct from any logical connection errors) * ConnectionImpl::closeInternal(): take copy of session map to prevent concurrent modification (by the same thread) as sessions are deleted and erase themselves. * ConnectionImpl::shutdown: hold lock before calling closeInternal(); mark handler failed before informing sessions of failure * SessionImpl::connectionBroker(): remove code as its rather meaningless * Don't swallow exceptions in Dispatcher * Handle exceptions in FailoverListener * Take weak_ptr to ConnectionImpl on constructor of Connector, then convert to shared_ptr when 'receiver' thread is started. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@710106 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/Exception.h7
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp2
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp23
-rw-r--r--cpp/src/qpid/client/Connector.cpp7
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp4
-rw-r--r--cpp/src/qpid/client/FailoverListener.cpp12
-rw-r--r--cpp/src/qpid/client/FailoverListener.h5
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp5
-rw-r--r--cpp/src/qpid/client/SessionImpl.h2
-rw-r--r--cpp/src/tests/exception_test.cpp9
10 files changed, 52 insertions, 24 deletions
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h
index 9cf564104d..57e7c682eb 100644
--- a/cpp/src/qpid/Exception.h
+++ b/cpp/src/qpid/Exception.h
@@ -80,6 +80,13 @@ struct ClosedException : public Exception {
std::string getPrefix() const;
};
+/**
+ * Exception representing transport failure
+ */
+struct TransportFailure : public Exception {
+ TransportFailure(const std::string& msg=std::string()) : Exception(msg) {}
+};
+
} // namespace qpid
#endif /*!_Exception_*/
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index c957ac0fc5..efff4027aa 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -89,7 +89,7 @@ void ConnectionHandler::outgoing(AMQFrame& frame)
if (getState() == OPEN)
out(frame);
else
- throw Exception(errorText.empty() ? "Connection is not open." : errorText);
+ throw TransportFailure(errorText.empty() ? "Connection is not open." : errorText);
}
void ConnectionHandler::waitForOpen()
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index ca88de62dd..a3be69e017 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -129,17 +129,22 @@ void ConnectionImpl::close()
{
if (!handler.isOpen()) return;
handler.close();
- closed(CLOSE_CODE_NORMAL, "Closed by client");
}
template <class F> void ConnectionImpl::closeInternal(const F& f) {
connector->close();
- for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ //notifying sessions of failure can result in those session being
+ //deleted which in turn results in a call to erase(); this can
+ //even happen on this thread, when 's' goes out of scope
+ //below. Using a copy prevents the map being modified as we
+ //iterate through.
+ SessionMap copy;
+ sessions.swap(copy);
+ for (SessionMap::iterator i = copy.begin(); i != copy.end(); ++i) {
boost::shared_ptr<SessionImpl> s = i->second.lock();
if (s) f(s);
}
- sessions.clear();
}
void ConnectionImpl::closed(uint16_t code, const std::string& text) {
@@ -148,7 +153,7 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) {
closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text));
}
-static const std::string CONN_CLOSED("Connection closed by broker");
+static const std::string CONN_CLOSED("Connection closed");
void ConnectionImpl::shutdown() {
if ( failureCallback )
@@ -158,10 +163,12 @@ void ConnectionImpl::shutdown() {
// FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have
// an appropriate close-code. connection-forced is not right.
- if (!handler.isClosing())
- closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
- setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
- handler.fail(CONN_CLOSED);
+ bool isClosing = handler.isClosing();
+ handler.fail(CONN_CLOSED);//ensure connection is marked as failed before notifying sessions
+ Mutex::ScopedLock l(lock);
+ if (!isClosing)
+ closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONN_CLOSED));
+ setException(new TransportFailure(CONN_CLOSED));
}
void ConnectionImpl::erase(uint16_t ch) {
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index f7a8d8b853..6509964fe8 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -35,6 +35,7 @@
#include <map>
#include <boost/bind.hpp>
#include <boost/format.hpp>
+#include <boost/weak_ptr.hpp>
namespace qpid {
namespace client {
@@ -140,7 +141,7 @@ class TCPConnector : public Connector, private sys::Runnable
std::string identifier;
- ConnectionImpl* impl;
+ boost::weak_ptr<ConnectionImpl> impl;
void connect(const std::string& host, int port);
void init();
@@ -183,7 +184,7 @@ TCPConnector::TCPConnector(ProtocolVersion ver,
shutdownHandler(0),
writer(maxFrameSize, cimpl),
aio(0),
- impl(cimpl)
+ impl(cimpl->shared_from_this())
{
QPID_LOG(debug, "TCPConnector created for " << version.toString());
settings.configureSocket(socket);
@@ -380,7 +381,7 @@ void TCPConnector::eof(AsynchIO&) {
// will never be called
void TCPConnector::run(){
// Keep the connection impl in memory until run() completes.
- boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
+ boost::shared_ptr<ConnectionImpl> protect = impl.lock();
assert(protect);
try {
Dispatcher d(poller);
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index 0b7618eb4c..da6607fb9e 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -91,9 +91,9 @@ void Dispatcher::run()
if ( failoverHandler ) {
QPID_LOG(debug, QPID_MSG(session.getId() << " failover: " << e.what()));
failoverHandler();
- }
- else {
+ } else {
QPID_LOG(error, session.getId() << " error: " << e.what());
+ throw;
}
}
}
diff --git a/cpp/src/qpid/client/FailoverListener.cpp b/cpp/src/qpid/client/FailoverListener.cpp
index 772d9a4197..16370f8912 100644
--- a/cpp/src/qpid/client/FailoverListener.cpp
+++ b/cpp/src/qpid/client/FailoverListener.cpp
@@ -61,7 +61,17 @@ FailoverListener::FailoverListener(const boost::shared_ptr<ConnectionImpl>& c, c
session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true);
session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
subscriptions->subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));
- thread = sys::Thread(*subscriptions);
+ thread = sys::Thread(*this);
+}
+
+void FailoverListener::run()
+{
+ try {
+ subscriptions->run();
+ } catch (const TransportFailure&) {
+ } catch (const std::exception& e) {
+ QPID_LOG(error, QPID_MSG(e.what()));
+ }
}
FailoverListener::~FailoverListener() {
diff --git a/cpp/src/qpid/client/FailoverListener.h b/cpp/src/qpid/client/FailoverListener.h
index fc0cca28f1..fe73a26611 100644
--- a/cpp/src/qpid/client/FailoverListener.h
+++ b/cpp/src/qpid/client/FailoverListener.h
@@ -25,6 +25,7 @@
#include "qpid/client/MessageListener.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include <vector>
@@ -36,7 +37,8 @@ class SubscriptionManager;
/**
* @internal Listen for failover updates from the amq.failover exchange.
*/
-class FailoverListener : public MessageListener {
+class FailoverListener : public MessageListener, private qpid::sys::Runnable
+{
public:
FailoverListener(const boost::shared_ptr<ConnectionImpl>&, const std::vector<Url>& initUrls);
~FailoverListener();
@@ -44,6 +46,7 @@ class FailoverListener : public MessageListener {
std::vector<Url> getKnownBrokers() const;
void received(Message& msg);
+ void run();
private:
mutable sys::Mutex lock;
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 08e405565a..0f86f7ff0a 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -260,8 +260,9 @@ void SessionImpl::connectionClosed(uint16_t code, const std::string& text) {
* Called by ConnectionImpl to notify active sessions when connection
* is disconnected
*/
-void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) {
- connectionClosed(_code, _text);
+void SessionImpl::connectionBroke(const std::string& _text) {
+ setException(sys::ExceptionHolder(new TransportFailure(_text)));
+ handleClosed();
}
Future SessionImpl::send(const AMQBody& command)
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index d56566ec14..1414862792 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -99,7 +99,7 @@ public:
//NOTE: these are called by the network thread when the connection is closed or dies
void connectionClosed(uint16_t code, const std::string& text);
- void connectionBroke(uint16_t code, const std::string& text);
+ void connectionBroke(const std::string& text);
/** Set timeout in seconds, returns actual timeout allowed by broker */
uint32_t setTimeout(uint32_t requestedSeconds);
diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp
index f3f5435699..e420bf2f0b 100644
--- a/cpp/src/tests/exception_test.cpp
+++ b/cpp/src/tests/exception_test.cpp
@@ -92,7 +92,7 @@ QPID_AUTO_TEST_CASE(DisconnectedPop) {
ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT));
fix.session.queueDeclare(arg::queue="q");
fix.subs.subscribe(fix.lq, "q");
- Catcher<ConnectionException> pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC));
+ Catcher<TransportFailure> pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC));
fix.connection.proxy.close();
BOOST_CHECK(pop.join());
}
@@ -106,11 +106,10 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) {
fix.session.queueDeclare(arg::queue="q");
fix.subs.subscribe(l, "q");
- ScopedSuppressLogging sl; // Suppress messages for expected errors.
- Thread t(fix.subs);
+ Catcher<TransportFailure> runner(bind(&SubscriptionManager::run, boost::ref(fix.subs)));
fix.connection.proxy.close();
- t.join();
- BOOST_CHECK_THROW(fix.session.close(), ConnectionException);
+ runner.join();
+ BOOST_CHECK_THROW(fix.session.close(), TransportFailure);
}
QPID_AUTO_TEST_CASE(NoSuchQueueTest) {