summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-06-09 20:29:32 +0000
committerAlan Conway <aconway@apache.org>2010-06-09 20:29:32 +0000
commit1951f9cf0c246b53d1cddf65dfdb059ad8662377 (patch)
tree2a8862e880777b61ecaae29bc335f18ade5c18c5 /cpp
parent81763331d2256790538f7003e8b98a9fee802881 (diff)
downloadqpid-python-1951f9cf0c246b53d1cddf65dfdb059ad8662377.tar.gz
Fix cluster-safe assertion in connection negotiation.
See https://bugzilla.redhat.com/show_bug.cgi?id=602347. In a cluster, raise the management connect event when processing cluster.announce. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@953147 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp12
-rw-r--r--cpp/src/qpid/broker/Connection.h1
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp2
-rw-r--r--cpp/src/qpid/sys/ClusterSafe.cpp6
-rw-r--r--cpp/src/qpid/sys/ClusterSafe.h17
-rwxr-xr-xcpp/src/tests/cluster_tests.py6
6 files changed, 39 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index ac574fc1a3..619f1a1bcb 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -24,6 +24,7 @@
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/ClusterSafe.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -121,7 +122,9 @@ Connection::~Connection()
{
if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
- if (!isLink)
+ // In a cluster, Connections destroyed during shutdown are in
+ // a cluster-unsafe context. Don't raise an event in that case.
+ if (!isLink && isClusterSafe())
agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId()));
}
if (isLink)
@@ -202,6 +205,13 @@ void Connection::notifyConnectionForced(const string& text)
void Connection::setUserId(const string& userId)
{
ConnectionState::setUserId(userId);
+ // In a cluster, the cluster code will raise the connect event
+ // when the connection is replicated to the cluster.
+ if (!sys::isCluster())
+ raiseConnectEvent();
+}
+
+void Connection::raiseConnectEvent() {
if (mgmtObject != 0) {
mgmtObject->set_authIdentity(userId);
agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId));
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index ad9f786179..cf199fa831 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -111,6 +111,7 @@ class Connection : public sys::ConnectionInputHandler,
std::string getAuthCredentials();
void notifyConnectionForced(const std::string& text);
void setUserId(const string& uid);
+ void raiseConnectEvent();
const std::string& getUserId() const { return ConnectionState::getUserId(); }
const std::string& getMgmtId() const { return mgmtId; }
management::ManagementAgent* getAgent() const { return agent; }
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 08e31c184a..c402415fab 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -165,6 +165,8 @@ void Connection::announce(
connection->received(frame);
connection->setUserId(username);
}
+ // Raise the connection management event now that the connection is replicated.
+ connection->raiseConnectEvent();
}
Connection::~Connection() {
diff --git a/cpp/src/qpid/sys/ClusterSafe.cpp b/cpp/src/qpid/sys/ClusterSafe.cpp
index 498a46d865..e051591afd 100644
--- a/cpp/src/qpid/sys/ClusterSafe.cpp
+++ b/cpp/src/qpid/sys/ClusterSafe.cpp
@@ -32,8 +32,12 @@ bool inCluster = false;
QPID_TSS bool inContext = false;
}
+bool isClusterSafe() { return !inCluster || inContext; }
+
+bool isCluster() { return inCluster; }
+
void assertClusterSafe() {
- if (inCluster && !inContext) {
+ if (!isClusterSafe()) {
QPID_LOG(critical, "Modified cluster state outside of cluster context");
::abort();
}
diff --git a/cpp/src/qpid/sys/ClusterSafe.h b/cpp/src/qpid/sys/ClusterSafe.h
index abb9ad0fff..f3382307d0 100644
--- a/cpp/src/qpid/sys/ClusterSafe.h
+++ b/cpp/src/qpid/sys/ClusterSafe.h
@@ -42,6 +42,20 @@ namespace sys {
QPID_COMMON_EXTERN void assertClusterSafe();
/**
+ * In a non-clustered broker, returns true.
+ *
+ * In a clustered broker returns true if we are in a context where it
+ * is safe to modify cluster state.
+ *
+ * This function is in the common library rather than the cluster
+ * library because it is called by code in the broker library.
+ */
+QPID_COMMON_EXTERN bool isClusterSafe();
+
+/** Return true in a clustered broker */
+QPID_COMMON_EXTERN bool isCluster();
+
+/**
* Base class for classes that encapsulate state which is replicated
* to all members of a cluster. Acts as a marker for clustered state
* and provides functions to assist detecting bugs in cluster
@@ -53,7 +67,8 @@ struct ClusterSafeScope {
};
/**
- * Enable cluster-safe assertions. By defaul they are no-ops.
+ * Enable cluster-safe assertions. By default they are no-ops.
+ * Called by cluster code.
*/
void enableClusterSafe();
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 02b3b29571..3fb184c282 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -223,6 +223,7 @@ class LongTests(BrokerTest):
"""Start ordinary clients for a broker. Start one client per broker.
Round-robin on a colllection of different clients."""
cmds=[
+ ["qpid-tool", "localhost:%s"%(broker.port())],
["qpid-perftest", "--count", 50000,
"--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
@@ -234,14 +235,15 @@ class LongTests(BrokerTest):
cmd = ["qpid-stat", "-b", "localhost:%s" %(broker.port())]
mclients.append(ClientLoop(broker, cmd))
- endtime = time.time() + self.duration()
+ duration = max(self.duration(), 5)
+ endtime = time.time() + duration
alive = 0 # First live cluster member
for i in range(len(cluster)):
start_clients(cluster[i], i)
start_mclients(cluster[alive])
while time.time() < endtime:
- time.sleep(min(5,self.duration()/2))
+ time.sleep(min(5,duration/2))
for b in cluster[alive:]: b.ready() # Check if a broker crashed.
# Kill the first broker, expect the clients to fail.
for c in clients[alive] + mclients: c.expect_fail()