summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-10-26 20:11:08 +0000
committerAlan Conway <aconway@apache.org>2009-10-26 20:11:08 +0000
commitb6eb88609aea82e676f33ae8ff68918b68b81d33 (patch)
tree7180c10a249f84f635459086ffab7fe93ece3e01 /cpp
parentf5c7bf95dd04dc1cf0248511982a18a45847da14 (diff)
downloadqpid-python-b6eb88609aea82e676f33ae8ff68918b68b81d33.tar.gz
Separate FailoverListener from client::Connection.
client::ConnectionImpl used to contain a FailoverListener to subscribe for updates on the amq.failover exchange. This caused some lifecycle issues including memory leaks. Now FailoverListener is a public API class that the user must create associated with a session to get known-broker updates. Removed the weak_ptr logic in client::SessionImpl which was only required because of FailoverListener. Made SessionImpl::close() idempotent. Gets rid of spurious warning messages in some tests. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@829931 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/qpid/client/Connection.h6
-rw-r--r--cpp/include/qpid/client/FailoverListener.h (renamed from cpp/src/qpid/client/FailoverListener.h)43
-rw-r--r--cpp/include/qpid/client/FailoverManager.h8
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/client/Connection.cpp4
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp9
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h5
-rw-r--r--cpp/src/qpid/client/FailoverListener.cpp95
-rw-r--r--cpp/src/qpid/client/FailoverManager.cpp5
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp29
-rw-r--r--cpp/src/qpid/client/SessionImpl.h9
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp5
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.h2
-rw-r--r--cpp/src/tests/BrokerFixture.h4
-rw-r--r--cpp/src/tests/ClusterFixture.cpp7
-rw-r--r--cpp/src/tests/cluster_test.cpp13
-rw-r--r--cpp/src/tests/exception_test.cpp4
17 files changed, 116 insertions, 134 deletions
diff --git a/cpp/include/qpid/client/Connection.h b/cpp/include/qpid/client/Connection.h
index 0f5999cdcc..bcf2962557 100644
--- a/cpp/include/qpid/client/Connection.h
+++ b/cpp/include/qpid/client/Connection.h
@@ -200,7 +200,11 @@ class Connection
QPID_CLIENT_EXTERN bool isOpen() const;
- QPID_CLIENT_EXTERN std::vector<Url> getKnownBrokers();
+ /** In a cluster, returns the initial set of known broker URLs
+ * at the time of connection.
+ */
+ QPID_CLIENT_EXTERN std::vector<Url> getInitialBrokers();
+
QPID_CLIENT_EXTERN void registerFailureCallback ( boost::function<void ()> fn );
/**
diff --git a/cpp/src/qpid/client/FailoverListener.h b/cpp/include/qpid/client/FailoverListener.h
index 7afee736ac..8414b80f2b 100644
--- a/cpp/src/qpid/client/FailoverListener.h
+++ b/cpp/include/qpid/client/FailoverListener.h
@@ -22,7 +22,11 @@
*
*/
+#include "qpid/client/ClientImportExport.h"
#include "qpid/client/MessageListener.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Runnable.h"
@@ -32,26 +36,43 @@
namespace qpid {
namespace client {
-class SubscriptionManager;
-class ConnectionImpl;
/**
- * @internal Listen for failover updates from the amq.failover exchange.
+ * Listen for updates from the amq.failover exchange.
+ *
+ * In a cluster, the amq.failover exchange provides updates whenever
+ * the cluster membership changes. This class subscribes to the
+ * failover exchange and providees the latest list of known brokers.
+ *
+ * You can also subscribe to amq.failover yourself and use
+ * FailoverListener::decode to extract a list of broker URLs from a
+ * failover exchange message.
*/
-class FailoverListener : public MessageListener, private qpid::sys::Runnable
+class FailoverListener : private MessageListener, private qpid::sys::Runnable
{
public:
- FailoverListener(const boost::shared_ptr<ConnectionImpl>&, const std::vector<Url>& initUrls);
- ~FailoverListener();
- void stop();
+ /** The name of the standard failover exchange amq.failover */
+ static QPID_CLIENT_EXTERN const std::string AMQ_FAILOVER;
- std::vector<Url> getKnownBrokers() const;
- void received(Message& msg);
- void run();
+ /** Extract the broker list from a failover exchange message */
+ static QPID_CLIENT_EXTERN std::vector<Url> getKnownBrokers(const Message& m);
+
+ /** Subscribe to amq.failover exchange. */
+ QPID_CLIENT_EXTERN FailoverListener(Connection);
+
+ QPID_CLIENT_EXTERN ~FailoverListener();
+
+ /** Returns the latest list of known broker URLs. */
+ QPID_CLIENT_EXTERN std::vector<Url> getKnownBrokers() const;
private:
+ void received(Message& msg);
+ void run();
+
mutable sys::Mutex lock;
- std::auto_ptr<SubscriptionManager> subscriptions;
+ Connection connection;
+ Session session;
+ SubscriptionManager subscriptions;
sys::Thread thread;
std::vector<Url> knownBrokers;
};
diff --git a/cpp/include/qpid/client/FailoverManager.h b/cpp/include/qpid/client/FailoverManager.h
index d50fcff8bb..0d30e2ed60 100644
--- a/cpp/include/qpid/client/FailoverManager.h
+++ b/cpp/include/qpid/client/FailoverManager.h
@@ -22,12 +22,13 @@
*
*/
-#include "qpid/client/Connection.h"
-#include "qpid/client/ConnectionSettings.h"
#include "qpid/Exception.h"
#include "qpid/client/AsyncSession.h"
-#include "qpid/sys/Monitor.h"
#include "qpid/client/ClientImportExport.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/client/FailoverListener.h"
+#include "qpid/sys/Monitor.h"
#include <vector>
namespace qpid {
@@ -123,6 +124,7 @@ class FailoverManager
qpid::sys::Monitor lock;
Connection connection;
+ std::auto_ptr<FailoverListener> failoverListener;
ConnectionSettings settings;
ReconnectionStrategy* strategy;
State state;
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index a6b90d7bde..6434b96e7b 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -652,7 +652,6 @@ libqpidclient_la_SOURCES = \
qpid/client/Dispatcher.h \
qpid/client/Execution.h \
qpid/client/FailoverListener.cpp \
- qpid/client/FailoverListener.h \
qpid/client/FailoverManager.cpp \
qpid/client/Future.cpp \
qpid/client/FutureCompletion.cpp \
@@ -744,6 +743,7 @@ nobase_include_HEADERS += \
../include/qpid/client/Completion.h \
../include/qpid/client/Connection.h \
../include/qpid/client/ConnectionSettings.h \
+ ../include/qpid/client/FailoverListener.h \
../include/qpid/client/FailoverManager.h \
../include/qpid/client/FlowControl.h \
../include/qpid/client/Future.h \
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index ae1cfa5815..32a01b2c40 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -152,8 +152,8 @@ void Connection::close() {
impl->close();
}
-std::vector<Url> Connection::getKnownBrokers() {
- return impl ? impl->getKnownBrokers() : std::vector<Url>();
+std::vector<Url> Connection::getInitialBrokers() {
+ return impl ? impl->getInitialBrokers() : std::vector<Url>();
}
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index 6a46cb6249..c48a580fe8 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -22,7 +22,6 @@
#include "qpid/client/Connector.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/SessionImpl.h"
-#include "qpid/client/FailoverListener.h"
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
@@ -88,7 +87,6 @@ ConnectionImpl::~ConnectionImpl() {
// Important to close the connector first, to ensure the
// connector thread does not call on us while the destructor
// is running.
- failover.reset();
if (connector) connector->close();
}
@@ -175,7 +173,6 @@ void ConnectionImpl::open()
} else {
QPID_LOG(debug, "No security layer in place");
}
- failover.reset(new FailoverListener(shared_from_this(), handler.knownBrokersUrls));
}
void ConnectionImpl::idleIn()
@@ -256,8 +253,8 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings()
return handler;
}
-std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
- return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls;
+std::vector<qpid::Url> ConnectionImpl::getInitialBrokers() {
+ return handler.knownBrokersUrls;
}
boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) {
@@ -267,6 +264,4 @@ boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& na
return simpl;
}
-void ConnectionImpl::stopFailoverListener() { failover->stop(); }
-
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index 8c82b6a3fe..2b32e1ccf0 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -42,7 +42,6 @@ namespace client {
class Connector;
struct ConnectionSettings;
class SessionImpl;
-class FailoverListener;
class ConnectionImpl : public Bounds,
public framing::FrameHandler,
@@ -58,7 +57,6 @@ class ConnectionImpl : public Bounds,
SessionMap sessions;
ConnectionHandler handler;
boost::scoped_ptr<Connector> connector;
- boost::scoped_ptr<FailoverListener> failover;
framing::ProtocolVersion version;
uint16_t nextChannel;
sys::Mutex lock;
@@ -90,9 +88,8 @@ class ConnectionImpl : public Bounds,
void erase(uint16_t channel);
const ConnectionSettings& getNegotiatedSettings();
- std::vector<Url> getKnownBrokers();
+ std::vector<Url> getInitialBrokers();
void registerFailureCallback ( boost::function<void ()> fn ) { failureCallback = fn; }
- void stopFailoverListener();
framing::ProtocolVersion getVersion() { return version; }
};
diff --git a/cpp/src/qpid/client/FailoverListener.cpp b/cpp/src/qpid/client/FailoverListener.cpp
index d84525ed89..3396f5598c 100644
--- a/cpp/src/qpid/client/FailoverListener.cpp
+++ b/cpp/src/qpid/client/FailoverListener.cpp
@@ -19,11 +19,7 @@
*
*/
#include "qpid/client/FailoverListener.h"
-#include "qpid/client/SessionBase_0_10Access.h"
-#include "qpid/client/SessionImpl.h"
-#include "qpid/client/ConnectionImpl.h"
-#include "qpid/client/SubscriptionImpl.h"
-#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Session.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include "qpid/log/Helpers.h"
@@ -31,83 +27,46 @@
namespace qpid {
namespace client {
-static const std::string AMQ_FAILOVER("amq.failover");
+const std::string FailoverListener::AMQ_FAILOVER("amq.failover");
-static Session makeSession(boost::shared_ptr<SessionImpl> si) {
- // Hold only a weak pointer to the ConnectionImpl so a
- // FailoverListener in a ConnectionImpl won't createa a shared_ptr
- // cycle.
- //
- si->setWeakPtr(true);
- Session s;
- SessionBase_0_10Access(s).set(si);
- return s;
-}
-
-FailoverListener::FailoverListener(const boost::shared_ptr<ConnectionImpl>& c, const std::vector<Url>& initUrls)
- : knownBrokers(initUrls)
- {
- // Special versions used to mark cluster catch-up connections
- // which do not need a FailoverListener
- if (c->getVersion().getMajor() >= 0x80) {
- QPID_LOG(debug, "No failover listener for catch-up connection.");
- return;
- }
-
- Session session = makeSession(c->newSession(AMQ_FAILOVER+framing::Uuid(true).str(), 0));
+FailoverListener::FailoverListener(Connection c) :
+ connection(c),
+ session(c.newSession(AMQ_FAILOVER+"."+framing::Uuid(true).str())),
+ subscriptions(session)
+{
+ knownBrokers = c.getInitialBrokers();
if (session.exchangeQuery(arg::name=AMQ_FAILOVER).getNotFound()) {
session.close();
return;
}
- subscriptions.reset(new SubscriptionManager(session));
std::string qname=session.getId().getName();
session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true);
session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
- subscriptions->subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));
+ subscriptions.subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(),
+ ACCEPT_MODE_NONE));
thread = sys::Thread(*this);
}
-void FailoverListener::run()
-{
+void FailoverListener::run() {
try {
- subscriptions->run();
- } catch (const TransportFailure&) {
- } catch (const std::exception& e) {
- QPID_LOG(error, QPID_MSG(e.what()));
- }
+ subscriptions.run();
+ } catch(...) {}
}
FailoverListener::~FailoverListener() {
- try { stop(); }
- catch (const std::exception& /*e*/) {}
-}
-
-void FailoverListener::stop() {
- if (subscriptions.get())
- subscriptions->stop();
-
- if (thread.id() == sys::Thread::current().id()) {
- // FIXME aconway 2008-10-16: this can happen if ConnectionImpl
- // dtor runs when my session drops its weak pointer lock.
- // For now, leak subscriptions to prevent a core if we delete
- // without joining.
- subscriptions.release();
- }
- else if (thread.id()) {
+ try {
+ subscriptions.stop();
thread.join();
- thread=sys::Thread();
- subscriptions.reset(); // Safe to delete after join.
- }
+ if (connection.isOpen()) {
+ session.sync();
+ session.close();
+ }
+ } catch (...) {}
}
void FailoverListener::received(Message& msg) {
sys::Mutex::ScopedLock l(lock);
- knownBrokers.clear();
- framing::Array urlArray;
- msg.getHeaders().getArray("amq.failover", urlArray);
- for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i != urlArray.end(); ++i )
- knownBrokers.push_back(Url((*i)->get<std::string>()));
- QPID_LOG(info, "Known-brokers update: " << log::formatList(knownBrokers));
+ knownBrokers = getKnownBrokers(msg);
}
std::vector<Url> FailoverListener::getKnownBrokers() const {
@@ -115,4 +74,16 @@ std::vector<Url> FailoverListener::getKnownBrokers() const {
return knownBrokers;
}
+std::vector<Url> FailoverListener::getKnownBrokers(const Message& msg) {
+ std::vector<Url> knownBrokers;
+ framing::Array urlArray;
+ msg.getHeaders().getArray("amq.failover", urlArray);
+ for (framing::Array::ValueVector::const_iterator i = urlArray.begin();
+ i != urlArray.end();
+ ++i )
+ knownBrokers.push_back(Url((*i)->get<std::string>()));
+ return knownBrokers;
+}
+
+
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/FailoverManager.cpp b/cpp/src/qpid/client/FailoverManager.cpp
index 967c3613c0..81f71eb7df 100644
--- a/cpp/src/qpid/client/FailoverManager.cpp
+++ b/cpp/src/qpid/client/FailoverManager.cpp
@@ -77,7 +77,9 @@ Connection& FailoverManager::connect(std::vector<Url> brokers)
} else {
state = CONNECTING;
Connection c;
- attempt(c, settings, brokers.empty() ? connection.getKnownBrokers() : brokers);
+ if (brokers.empty() && failoverListener.get())
+ brokers = failoverListener->getKnownBrokers();
+ attempt(c, settings, brokers);
if (c.isOpen()) state = IDLE;
else state = CANT_CONNECT;
connection = c;
@@ -118,6 +120,7 @@ void FailoverManager::attempt(Connection& c, ConnectionSettings s)
try {
QPID_LOG(info, "Attempting to connect to " << s.host << " on " << s.port << "...");
c.open(s);
+ failoverListener.reset(new FailoverListener(c));
QPID_LOG(info, "Connected to " << s.host << " on " << s.port);
} catch (const Exception& e) {
QPID_LOG(info, "Could not connect to " << s.host << " on " << s.port << ": " << e.what());
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index b3181bdb3c..7c807558f0 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -57,9 +57,7 @@ SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionIm
detachedLifetime(0),
maxFrameSize(conn->getNegotiatedSettings().maxFrameSize),
id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name),
- connectionShared(conn),
- connectionWeak(conn),
- weakPtr(false),
+ connection(conn),
ioHandler(*this),
proxy(ioHandler),
nextIn(0),
@@ -68,7 +66,7 @@ SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionIm
doClearDeliveryPropertiesExchange(true),
autoDetach(true)
{
- channel.next = connectionShared.get();
+ channel.next = connection.get();
}
SessionImpl::~SessionImpl() {
@@ -87,8 +85,7 @@ SessionImpl::~SessionImpl() {
}
delete sendMsgCredit;
}
- boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock();
- if (c) c->erase(channel);
+ connection->erase(channel);
}
@@ -122,6 +119,7 @@ void SessionImpl::open(uint32_t timeout) // user thread
void SessionImpl::close() //user thread
{
Lock l(state);
+ if (state == DETACHED || state == DETACHING) return;
if (detachedLifetime) setTimeout(0);
detach();
waitFor(DETACHED);
@@ -129,8 +127,6 @@ void SessionImpl::close() //user thread
void SessionImpl::resume(boost::shared_ptr<ConnectionImpl>) // user thread
{
- // weakPtr sessions should not be resumed.
- if (weakPtr) return;
throw NotImplementedException("Resume not yet implemented by client!");
}
@@ -509,11 +505,8 @@ void SessionImpl::proxyOut(AMQFrame& frame) // network thread
void SessionImpl::sendFrame(AMQFrame& frame, bool canBlock)
{
- boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock();
- if (c) {
- channel.handle(frame);
- c->expand(frame.encodedSize(), canBlock);
- }
+ channel.handle(frame);
+ connection->expand(frame.encodedSize(), canBlock);
}
void SessionImpl::deliver(AMQFrame& frame) // network thread
@@ -809,17 +802,9 @@ uint32_t SessionImpl::getTimeout() const {
return detachedLifetime;
}
-void SessionImpl::setWeakPtr(bool weak) {
- weakPtr = weak;
- if (weakPtr)
- connectionShared.reset(); // Only keep weak pointer
- else
- connectionShared = connectionWeak.lock();
-}
-
boost::shared_ptr<ConnectionImpl> SessionImpl::getConnection()
{
- return connectionWeak.lock();
+ return connection;
}
void SessionImpl::disableAutoDetach() { autoDetach = false; }
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index cbd0742045..2f35032c4e 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -120,11 +120,6 @@ public:
/** Get timeout in seconds. */
uint32_t getTimeout() const;
- /** Make this session use a weak_ptr to the ConnectionImpl.
- * Used for sessions created by the ConnectionImpl itself.
- */
- void setWeakPtr(bool weak=true);
-
/**
* get the Connection associated with this connection
*/
@@ -224,9 +219,7 @@ private:
const uint64_t maxFrameSize;
const SessionId id;
- boost::shared_ptr<ConnectionImpl> connectionShared;
- boost::weak_ptr<ConnectionImpl> connectionWeak;
- bool weakPtr;
+ boost::shared_ptr<ConnectionImpl> connection;
framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
framing::ChannelHandler channel;
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 3a735b5698..1698f96caf 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -136,7 +136,9 @@ void ConnectionImpl::connect(const AbsTime& started)
bool ConnectionImpl::tryConnect()
{
- if (tryConnect(url) || tryConnect(connection.getKnownBrokers())) {
+ if (tryConnect(url) ||
+ (failoverListener.get() && tryConnect(failoverListener->getKnownBrokers())))
+ {
return resetSessions();
} else {
return false;
@@ -148,6 +150,7 @@ bool ConnectionImpl::tryConnect(const Url& u)
try {
QPID_LOG(info, "Trying to connect to " << url << "...");
connection.open(u, settings);
+ failoverListener.reset(new FailoverListener(connection));
return true;
} catch (const Exception& e) {
//TODO: need to fix timeout on open so that it throws TransportFailure
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index 565f2ec7ec..f4bc09594d 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -25,6 +25,7 @@
#include "qpid/messaging/Variant.h"
#include "qpid/Url.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/FailoverListener.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Semaphore.h"
@@ -50,6 +51,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
qpid::sys::Mutex lock;//used to protect data structures
qpid::sys::Semaphore semaphore;//used to coordinate reconnection
qpid::client::Connection connection;
+ std::auto_ptr<FailoverListener> failoverListener;
qpid::Url url;
qpid::client::ConnectionSettings settings;
Sessions sessions;
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index 861400f539..f56a925b81 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -126,8 +126,8 @@ struct ClientT {
ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string())
: connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {}
- ~ClientT() { connection.close(); }
- void close() { session.close(); connection.close(); }
+ ~ClientT() { close(); }
+ void close() { if (connection.isOpen()) { session.close(); connection.close(); } }
};
typedef ClientT<> Client;
diff --git a/cpp/src/tests/ClusterFixture.cpp b/cpp/src/tests/ClusterFixture.cpp
index 7c357c3cd1..fd90ed170e 100644
--- a/cpp/src/tests/ClusterFixture.cpp
+++ b/cpp/src/tests/ClusterFixture.cpp
@@ -141,13 +141,14 @@ void ClusterFixture::killWithSilencer(size_t n, client::Connection& c, int sig)
* Get the known broker ports from a Connection.
*@param n if specified wait for the cluster size to be n, up to a timeout.
*/
-std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n) {
- std::vector<qpid::Url> urls = source.getKnownBrokers();
+std::set<int> knownBrokerPorts(qpid::client::Connection& c, int n) {
+ FailoverListener fl(c);
+ std::vector<qpid::Url> urls = fl.getKnownBrokers();
if (n >= 0 && unsigned(n) != urls.size()) {
// Retry up to 10 secs in .1 second intervals.
for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
qpid::sys::usleep(1000*100); // 0.1 secs
- urls = source.getKnownBrokers();
+ urls = fl.getKnownBrokers();
}
}
std::set<int> s;
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 247aef1b2a..de8ec49ea6 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -358,13 +358,15 @@ QPID_AUTO_TEST_CASE(testTxTransaction) {
rollbackSession.txRollback();
rollbackSession.messageRelease(rollbackMessage.getId());
-
// Verify queue status: just the comitted messages and dequeues should remain.
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B");
BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a");
BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b");
BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c");
+
+ commitSession.close();
+ rollbackSession.close();
}
QPID_AUTO_TEST_CASE(testUnacked) {
@@ -859,9 +861,12 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
Receiver receiver(fmgr, "my-queue", "my-data");
qpid::sys::Thread runner(receiver);
receiver.waitForReady();
- cluster.kill(1);
- //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection:
- ::usleep(2*1000*1000);
+ {
+ ScopedSuppressLogging allQuiet; // suppress connection closed messages
+ cluster.kill(1);
+ //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection:
+ ::usleep(2*1000*1000);
+ }
fmgr.execute(sender);
runner.join();
BOOST_CHECK(!receiver.failed);
diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp
index 0e9a948f00..4dac8ee965 100644
--- a/cpp/src/tests/exception_test.cpp
+++ b/cpp/src/tests/exception_test.cpp
@@ -112,8 +112,8 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) {
Catcher<TransportFailure> runner(bind(&SubscriptionManager::run, boost::ref(fix.subs)));
fix.connection.proxy.close();
- runner.join();
- BOOST_CHECK_THROW(fix.session.close(), TransportFailure);
+ runner.join();
+ BOOST_CHECK_THROW(fix.session.queueDeclare(arg::queue="x"), TransportFailure);
}
QPID_AUTO_TEST_CASE(NoSuchQueueTest) {