summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-01-18 20:43:41 +0000
committerAlan Conway <aconway@apache.org>2011-01-18 20:43:41 +0000
commitb4462bef74f9dfedf726f4eed29f9463825e2d7b (patch)
tree6fa6cc3550e2cffbbb6d98d65c16eb1189a2758b
parent54895c3ce0a66a4630289bfeb6ed4f86516e784e (diff)
downloadqpid-python-b4462bef74f9dfedf726f4eed29f9463825e2d7b.tar.gz
QPID-2982 Bug 669452 - Creating a route and using management tools can crash cluster members.
Cluster update did not include federation link and bridge objects. Fixed update to include them. Management linkUp and linkDown events were generated only on the broker receiving the link. Suppressed these events in a cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1060568 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp27
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp30
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h6
-rwxr-xr-xqpid/cpp/src/tests/cluster_test_logs.py3
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py40
-rw-r--r--qpid/cpp/xml/cluster.xml4
12 files changed, 135 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 5911d916ad..7fbbf4e2c4 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -60,8 +60,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
(agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
- if (!args.i_durable)
- agent->addObject(mgmtObject);
+ agent->addObject(mgmtObject);
}
QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
}
@@ -167,10 +166,6 @@ void Bridge::destroy()
void Bridge::setPersistenceId(uint64_t pId) const
{
- if (mgmtObject != 0 && persistenceId == 0) {
- ManagementAgent* agent = link->getBroker()->getManagementAgent();
- agent->addObject (mgmtObject, pId);
- }
persistenceId = pId;
}
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 5a50d26c8c..e1091df724 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -30,6 +30,7 @@
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/sys/ClusterSafe.h"
using namespace qpid::broker;
using qpid::framing::Buffer;
@@ -130,9 +131,12 @@ void Link::established ()
{
stringstream addr;
addr << host << ":" << port;
-
QPID_LOG (info, "Inter-broker link established to " << addr.str());
- agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
+
+ // Don't raise the management event in a cluster, other members wont't get this call.
+ if (!sys::isCluster())
+ agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
+
{
Mutex::ScopedLock mutex(lock);
setStateLH(STATE_OPERATIONAL);
@@ -150,11 +154,13 @@ void Link::closed (int, std::string text)
connection = 0;
+ // Don't raise the management event in a cluster, other members wont't get this call.
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
- agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
+ if (!sys::isCluster())
+ agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index ea14552cc1..82f1f0ea24 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -379,3 +379,12 @@ void LinkRegistry::setPassive(bool p)
passive = p;
//will activate or passivate links on maintenance visit
}
+
+void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
+ for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
+}
+
+void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) {
+ for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
+}
+
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index a1931920d7..4c97e4f9d8 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -31,6 +31,7 @@
#include "qpid/management/Manageable.h"
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
+#include <boost/function.hpp>
namespace qpid {
namespace broker {
@@ -148,6 +149,12 @@ namespace broker {
* bridges won't therefore pull or push any messages.
*/
void setPassive(bool);
+
+
+ /** Iterate over each link in the registry. Used for cluster updates. */
+ void eachLink(boost::function<void(boost::shared_ptr<Link>)> f);
+ /** Iterate over each bridge in the registry. Used for cluster updates. */
+ void eachBridge(boost::function<void(boost::shared_ptr< Bridge>)> f);
};
}
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 5720f7fcc1..dd4882774b 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1045272;
+const uint32_t Cluster::CLUSTER_VERSION = 1058747;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 2797fdcf02..c7689577a7 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -32,6 +32,8 @@
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Bridge.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
@@ -346,13 +348,12 @@ size_t Connection::decode(const char* data, size_t size) {
// returns true if the header is complete or already read.
bool Connection::checkProtocolHeader(const char*& data, size_t size) {
if (expectProtocolHeader) {
- //If this is an outgoing link, we will receive a protocol
- //header which needs to be decoded first
+ // This is an outgoing link connection, we will receive a protocol
+ // header which needs to be decoded first
framing::ProtocolInitiation pi;
Buffer buf(const_cast<char*&>(data), size);
if (pi.decode(buf)) {
//TODO: check the version is correct
- QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
expectProtocolHeader = false;
data += pi.encodedSize();
} else {
@@ -650,5 +651,25 @@ void Connection::managementSetupState(
agent->setUuid(id);
agent->setName(vendor, product, instance);
}
+
+void Connection::config(const std::string& encoded) {
+ Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ string kind;
+ buf.getShortString (kind);
+ if (kind == "link") {
+ broker::Link::shared_ptr link =
+ broker::Link::decode(cluster.getBroker().getLinks(), buf);
+ QPID_LOG(debug, cluster << " updated link "
+ << link->getHost() << ":" << link->getPort());
+ }
+ else if (kind == "bridge") {
+ broker::Bridge::shared_ptr bridge =
+ broker::Bridge::decode(cluster.getBroker().getLinks(), buf);
+ QPID_LOG(debug, cluster << " updated bridge " << bridge->getName());
+ }
+ else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
+}
+
+
}} // Namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 69b8cb1450..d90cdd898b 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -183,7 +183,9 @@ class Connection :
const std::string& vendor,
const std::string& product,
const std::string& instance);
-
+
+ void config(const std::string& encoded);
+
void setSecureConnection ( broker::SecureConnection * sc );
private:
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 59db4de526..e5d20c85e6 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -34,6 +34,9 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/LinkRegistry.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Link.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -167,7 +170,7 @@ void UpdateClient::update() {
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
ClusterConnectionProxy(session).expiryId(expiry.getId());
-
+ updateLinks();
updateManagementAgent();
session.close();
@@ -199,6 +202,14 @@ template <class T> std::string encode(const T& t) {
t.encode(buf);
return encoded;
}
+
+template <class T> std::string encode(const T& t, bool encodeKind) {
+ std::string encoded;
+ encoded.resize(t.encodedSize());
+ framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ t.encode(buf, encodeKind);
+ return encoded;
+}
} // namespace
@@ -583,4 +594,21 @@ void UpdateClient::updateQueueListener(std::string& q,
ClusterConnectionProxy(session).addQueueListener(q, n);
}
+void UpdateClient::updateLinks() {
+ broker::LinkRegistry& links = updaterBroker.getLinks();
+ links.eachLink(boost::bind(&UpdateClient::updateLink, this, _1));
+ links.eachBridge(boost::bind(&UpdateClient::updateBridge, this, _1));
+}
+
+void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) {
+ QPID_LOG(debug, *this << " updating link "
+ << link->getHost() << ":" << link->getPort());
+ ClusterConnectionProxy(session).config(encode(*link));
+}
+
+void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) {
+ QPID_LOG(debug, *this << " updating bridge " << bridge->getName());
+ ClusterConnectionProxy(session).config(encode(*bridge));
+}
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index 76621cd7ba..156fa112df 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -49,6 +49,8 @@ class DeliveryRecord;
class SessionState;
class SemanticState;
class Decoder;
+class Link;
+class Bridge;
} // namespace broker
@@ -99,6 +101,10 @@ class UpdateClient : public sys::Runnable {
void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c);
void updateManagementSetupState();
void updateManagementAgent();
+ void updateLinks();
+ void updateLink(const boost::shared_ptr<broker::Link>&);
+ void updateBridge(const boost::shared_ptr<broker::Bridge>&);
+
Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering;
MemberId updaterId;
diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py
index 261b1d522b..eae28fc4e5 100755
--- a/qpid/cpp/src/tests/cluster_test_logs.py
+++ b/qpid/cpp/src/tests/cluster_test_logs.py
@@ -58,7 +58,8 @@ def filter_log(log):
'warning Broker closed connection: 200, OK',
'task late',
'task overran',
- 'warning CLOSING .* unsent data'
+ 'warning CLOSING .* unsent data',
+ 'Inter-broker link '
])
if re.compile(skip).search(l): continue
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 8bc89b2292..9bfd1b2d89 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -26,6 +26,7 @@ from qpid.messaging import Message, Empty
from threading import Thread, Lock
from logging import getLogger
from itertools import chain
+from tempfile import NamedTemporaryFile
log = getLogger("qpid.cluster_tests")
@@ -264,6 +265,45 @@ acl allow all all
cluster.start()
self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0)
+ def test_route_update(self):
+ """Regression test for https://issues.apache.org/jira/browse/QPID-2982
+ Links and bridges associated with routes were not replicated on update.
+ This meant extra management objects and caused an exit if a management
+ client was attached.
+ """
+ args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+ cluster0 = self.cluster(1, args=args)
+ cluster1 = self.cluster(1, args=args)
+ assert 0 == subprocess.call(
+ ["qpid-route", "route", "add", cluster0[0].host_port(),
+ cluster1[0].host_port(), "dummy-exchange", "dummy-key", "-d"])
+ cluster0.start()
+
+ # Wait for qpid-tool:list on cluster0[0] to generate expected output.
+ pattern = re.compile("org.apache.qpid.broker.*link")
+ qpid_tool = subprocess.Popen(["qpid-tool", cluster0[0].host_port()],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ class Scanner(Thread):
+ def __init__(self): self.found = False; Thread.__init__(self)
+ def run(self):
+ for l in qpid_tool.stdout:
+ if pattern.search(l): self.found = True; return
+ scanner = Scanner()
+ scanner.start()
+ start = time.time()
+ try:
+ # Wait up to 5 second timeout for scanner to find expected output
+ while not scanner.found and time.time() < start + 5:
+ qpid_tool.stdin.write("list\n") # Ask qpid-tool to list
+ for b in cluster0: b.ready() # Raise if any brokers are down
+ finally:
+ qpid_tool.stdin.write("quit\n")
+ qpid_tool.wait()
+ scanner.join()
+ assert scanner.found
+ # Verify logs are consistent
+ cluster_test_logs.verify_logs(glob.glob("*.log"))
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 0462838b1b..5e407a061f 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -272,5 +272,9 @@
<field name="product" type="str32"/>
<field name="instance" type="str32"/>
</control>
+
+ <!-- Replicate encoded config objects - e.g. links and bridges. -->
+ <control name="config" code="0x37"><field name="encoded" type="str32"/></control>
</class>
+
</amqp>