diff options
author | Alan Conway <aconway@apache.org> | 2010-06-09 20:29:32 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-06-09 20:29:32 +0000 |
commit | 1951f9cf0c246b53d1cddf65dfdb059ad8662377 (patch) | |
tree | 2a8862e880777b61ecaae29bc335f18ade5c18c5 /cpp | |
parent | 81763331d2256790538f7003e8b98a9fee802881 (diff) | |
download | qpid-python-1951f9cf0c246b53d1cddf65dfdb059ad8662377.tar.gz |
Fix cluster-safe assertion in connection negotiation.
See https://bugzilla.redhat.com/show_bug.cgi?id=602347.
In a cluster, raise the management connect event when processing cluster.announce.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@953147 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ClusterSafe.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ClusterSafe.h | 17 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 6 |
6 files changed, 39 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ac574fc1a3..619f1a1bcb 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -24,6 +24,7 @@ #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/sys/SecuritySettings.h" +#include "qpid/sys/ClusterSafe.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" @@ -121,7 +122,9 @@ Connection::~Connection() { if (mgmtObject != 0) { mgmtObject->resourceDestroy(); - if (!isLink) + // In a cluster, Connections destroyed during shutdown are in + // a cluster-unsafe context. Don't raise an event in that case. + if (!isLink && isClusterSafe()) agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId())); } if (isLink) @@ -202,6 +205,13 @@ void Connection::notifyConnectionForced(const string& text) void Connection::setUserId(const string& userId) { ConnectionState::setUserId(userId); + // In a cluster, the cluster code will raise the connect event + // when the connection is replicated to the cluster. + if (!sys::isCluster()) + raiseConnectEvent(); +} + +void Connection::raiseConnectEvent() { if (mgmtObject != 0) { mgmtObject->set_authIdentity(userId); agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId)); diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index ad9f786179..cf199fa831 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -111,6 +111,7 @@ class Connection : public sys::ConnectionInputHandler, std::string getAuthCredentials(); void notifyConnectionForced(const std::string& text); void setUserId(const string& uid); + void raiseConnectEvent(); const std::string& getUserId() const { return ConnectionState::getUserId(); } const std::string& getMgmtId() const { return mgmtId; } management::ManagementAgent* getAgent() const { return agent; } diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 08e31c184a..c402415fab 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -165,6 +165,8 @@ void Connection::announce( connection->received(frame); connection->setUserId(username); } + // Raise the connection management event now that the connection is replicated. + connection->raiseConnectEvent(); } Connection::~Connection() { diff --git a/cpp/src/qpid/sys/ClusterSafe.cpp b/cpp/src/qpid/sys/ClusterSafe.cpp index 498a46d865..e051591afd 100644 --- a/cpp/src/qpid/sys/ClusterSafe.cpp +++ b/cpp/src/qpid/sys/ClusterSafe.cpp @@ -32,8 +32,12 @@ bool inCluster = false; QPID_TSS bool inContext = false; } +bool isClusterSafe() { return !inCluster || inContext; } + +bool isCluster() { return inCluster; } + void assertClusterSafe() { - if (inCluster && !inContext) { + if (!isClusterSafe()) { QPID_LOG(critical, "Modified cluster state outside of cluster context"); ::abort(); } diff --git a/cpp/src/qpid/sys/ClusterSafe.h b/cpp/src/qpid/sys/ClusterSafe.h index abb9ad0fff..f3382307d0 100644 --- a/cpp/src/qpid/sys/ClusterSafe.h +++ b/cpp/src/qpid/sys/ClusterSafe.h @@ -42,6 +42,20 @@ namespace sys { QPID_COMMON_EXTERN void assertClusterSafe(); /** + * In a non-clustered broker, returns true. + * + * In a clustered broker returns true if we are in a context where it + * is safe to modify cluster state. + * + * This function is in the common library rather than the cluster + * library because it is called by code in the broker library. + */ +QPID_COMMON_EXTERN bool isClusterSafe(); + +/** Return true in a clustered broker */ +QPID_COMMON_EXTERN bool isCluster(); + +/** * Base class for classes that encapsulate state which is replicated * to all members of a cluster. Acts as a marker for clustered state * and provides functions to assist detecting bugs in cluster @@ -53,7 +67,8 @@ struct ClusterSafeScope { }; /** - * Enable cluster-safe assertions. By defaul they are no-ops. + * Enable cluster-safe assertions. By default they are no-ops. + * Called by cluster code. */ void enableClusterSafe(); diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 02b3b29571..3fb184c282 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -223,6 +223,7 @@ class LongTests(BrokerTest): """Start ordinary clients for a broker. Start one client per broker. Round-robin on a colllection of different clients.""" cmds=[ + ["qpid-tool", "localhost:%s"%(broker.port())], ["qpid-perftest", "--count", 50000, "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()], ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())], @@ -234,14 +235,15 @@ class LongTests(BrokerTest): cmd = ["qpid-stat", "-b", "localhost:%s" %(broker.port())] mclients.append(ClientLoop(broker, cmd)) - endtime = time.time() + self.duration() + duration = max(self.duration(), 5) + endtime = time.time() + duration alive = 0 # First live cluster member for i in range(len(cluster)): start_clients(cluster[i], i) start_mclients(cluster[alive]) while time.time() < endtime: - time.sleep(min(5,self.duration()/2)) + time.sleep(min(5,duration/2)) for b in cluster[alive:]: b.ready() # Check if a broker crashed. # Kill the first broker, expect the clients to fail. for c in clients[alive] + mclients: c.expect_fail() |