diff options
author | Alan Conway <aconway@apache.org> | 2011-01-18 20:43:41 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-01-18 20:43:41 +0000 |
commit | b4462bef74f9dfedf726f4eed29f9463825e2d7b (patch) | |
tree | 6fa6cc3550e2cffbbb6d98d65c16eb1189a2758b | |
parent | 54895c3ce0a66a4630289bfeb6ed4f86516e784e (diff) | |
download | qpid-python-b4462bef74f9dfedf726f4eed29f9463825e2d7b.tar.gz |
QPID-2982 Bug 669452 - Creating a route and using management tools can crash cluster members.
Cluster update did not include federation link and bridge
objects. Fixed update to include them.
Management linkUp and linkDown events were generated only on the
broker receiving the link. Suppressed these events in a cluster.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1060568 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 27 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.h | 6 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_test_logs.py | 3 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 40 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 4 |
12 files changed, 135 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 5911d916ad..7fbbf4e2c4 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -60,8 +60,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, (agent, this, link, id, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); - if (!args.i_durable) - agent->addObject(mgmtObject); + agent->addObject(mgmtObject); } QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest); } @@ -167,10 +166,6 @@ void Bridge::destroy() void Bridge::setPersistenceId(uint64_t pId) const { - if (mgmtObject != 0 && persistenceId == 0) { - ManagementAgent* agent = link->getBroker()->getManagementAgent(); - agent->addObject (mgmtObject, pId); - } persistenceId = pId; } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 5a50d26c8c..e1091df724 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -30,6 +30,7 @@ #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/AclModule.h" +#include "qpid/sys/ClusterSafe.h" using namespace qpid::broker; using qpid::framing::Buffer; @@ -130,9 +131,12 @@ void Link::established () { stringstream addr; addr << host << ":" << port; - QPID_LOG (info, "Inter-broker link established to " << addr.str()); - agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); + + // Don't raise the management event in a cluster, other members wont't get this call. + if (!sys::isCluster()) + agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); + { Mutex::ScopedLock mutex(lock); setStateLH(STATE_OPERATIONAL); @@ -150,11 +154,13 @@ void Link::closed (int, std::string text) connection = 0; + // Don't raise the management event in a cluster, other members wont't get this call. if (state == STATE_OPERATIONAL) { stringstream addr; addr << host << ":" << port; QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str()); - agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); + if (!sys::isCluster()) + agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); } for (Bridges::iterator i = active.begin(); i != active.end(); i++) { diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index ea14552cc1..82f1f0ea24 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -379,3 +379,12 @@ void LinkRegistry::setPassive(bool p) passive = p; //will activate or passivate links on maintenance visit } + +void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) { + for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second); +} + +void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) { + for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second); +} + diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index a1931920d7..4c97e4f9d8 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -31,6 +31,7 @@ #include "qpid/management/Manageable.h" #include <boost/shared_ptr.hpp> #include <boost/intrusive_ptr.hpp> +#include <boost/function.hpp> namespace qpid { namespace broker { @@ -148,6 +149,12 @@ namespace broker { * bridges won't therefore pull or push any messages. */ void setPassive(bool); + + + /** Iterate over each link in the registry. Used for cluster updates. */ + void eachLink(boost::function<void(boost::shared_ptr<Link>)> f); + /** Iterate over each bridge in the registry. Used for cluster updates. */ + void eachBridge(boost::function<void(boost::shared_ptr< Bridge>)> f); }; } } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 5720f7fcc1..dd4882774b 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1045272; +const uint32_t Cluster::CLUSTER_VERSION = 1058747; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 2797fdcf02..c7689577a7 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -32,6 +32,8 @@ #include "qpid/broker/RecoveredEnqueue.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/Bridge.h" #include "qpid/broker/Queue.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" @@ -346,13 +348,12 @@ size_t Connection::decode(const char* data, size_t size) { // returns true if the header is complete or already read. bool Connection::checkProtocolHeader(const char*& data, size_t size) { if (expectProtocolHeader) { - //If this is an outgoing link, we will receive a protocol - //header which needs to be decoded first + // This is an outgoing link connection, we will receive a protocol + // header which needs to be decoded first framing::ProtocolInitiation pi; Buffer buf(const_cast<char*&>(data), size); if (pi.decode(buf)) { //TODO: check the version is correct - QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")"); expectProtocolHeader = false; data += pi.encodedSize(); } else { @@ -650,5 +651,25 @@ void Connection::managementSetupState( agent->setUuid(id); agent->setName(vendor, product, instance); } + +void Connection::config(const std::string& encoded) { + Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); + string kind; + buf.getShortString (kind); + if (kind == "link") { + broker::Link::shared_ptr link = + broker::Link::decode(cluster.getBroker().getLinks(), buf); + QPID_LOG(debug, cluster << " updated link " + << link->getHost() << ":" << link->getPort()); + } + else if (kind == "bridge") { + broker::Bridge::shared_ptr bridge = + broker::Bridge::decode(cluster.getBroker().getLinks(), buf); + QPID_LOG(debug, cluster << " updated bridge " << bridge->getName()); + } + else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); +} + + }} // Namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 69b8cb1450..d90cdd898b 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -183,7 +183,9 @@ class Connection : const std::string& vendor, const std::string& product, const std::string& instance); - + + void config(const std::string& encoded); + void setSecureConnection ( broker::SecureConnection * sc ); private: diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 59db4de526..e5d20c85e6 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -34,6 +34,9 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/LinkRegistry.h" +#include "qpid/broker/Bridge.h" +#include "qpid/broker/Link.h" #include "qpid/broker/Message.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/ExchangeRegistry.h" @@ -167,7 +170,7 @@ void UpdateClient::update() { b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); ClusterConnectionProxy(session).expiryId(expiry.getId()); - + updateLinks(); updateManagementAgent(); session.close(); @@ -199,6 +202,14 @@ template <class T> std::string encode(const T& t) { t.encode(buf); return encoded; } + +template <class T> std::string encode(const T& t, bool encodeKind) { + std::string encoded; + encoded.resize(t.encodedSize()); + framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); + t.encode(buf, encodeKind); + return encoded; +} } // namespace @@ -583,4 +594,21 @@ void UpdateClient::updateQueueListener(std::string& q, ClusterConnectionProxy(session).addQueueListener(q, n); } +void UpdateClient::updateLinks() { + broker::LinkRegistry& links = updaterBroker.getLinks(); + links.eachLink(boost::bind(&UpdateClient::updateLink, this, _1)); + links.eachBridge(boost::bind(&UpdateClient::updateBridge, this, _1)); +} + +void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) { + QPID_LOG(debug, *this << " updating link " + << link->getHost() << ":" << link->getPort()); + ClusterConnectionProxy(session).config(encode(*link)); +} + +void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) { + QPID_LOG(debug, *this << " updating bridge " << bridge->getName()); + ClusterConnectionProxy(session).config(encode(*bridge)); +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index 76621cd7ba..156fa112df 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -49,6 +49,8 @@ class DeliveryRecord; class SessionState; class SemanticState; class Decoder; +class Link; +class Bridge; } // namespace broker @@ -99,6 +101,10 @@ class UpdateClient : public sys::Runnable { void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c); void updateManagementSetupState(); void updateManagementAgent(); + void updateLinks(); + void updateLink(const boost::shared_ptr<broker::Link>&); + void updateBridge(const boost::shared_ptr<broker::Bridge>&); + Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering; MemberId updaterId; diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py index 261b1d522b..eae28fc4e5 100755 --- a/qpid/cpp/src/tests/cluster_test_logs.py +++ b/qpid/cpp/src/tests/cluster_test_logs.py @@ -58,7 +58,8 @@ def filter_log(log): 'warning Broker closed connection: 200, OK', 'task late', 'task overran', - 'warning CLOSING .* unsent data' + 'warning CLOSING .* unsent data', + 'Inter-broker link ' ]) if re.compile(skip).search(l): continue diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 8bc89b2292..9bfd1b2d89 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -26,6 +26,7 @@ from qpid.messaging import Message, Empty from threading import Thread, Lock from logging import getLogger from itertools import chain +from tempfile import NamedTemporaryFile log = getLogger("qpid.cluster_tests") @@ -264,6 +265,45 @@ acl allow all all cluster.start() self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0) + def test_route_update(self): + """Regression test for https://issues.apache.org/jira/browse/QPID-2982 + Links and bridges associated with routes were not replicated on update. + This meant extra management objects and caused an exit if a management + client was attached. + """ + args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] + cluster0 = self.cluster(1, args=args) + cluster1 = self.cluster(1, args=args) + assert 0 == subprocess.call( + ["qpid-route", "route", "add", cluster0[0].host_port(), + cluster1[0].host_port(), "dummy-exchange", "dummy-key", "-d"]) + cluster0.start() + + # Wait for qpid-tool:list on cluster0[0] to generate expected output. + pattern = re.compile("org.apache.qpid.broker.*link") + qpid_tool = subprocess.Popen(["qpid-tool", cluster0[0].host_port()], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + class Scanner(Thread): + def __init__(self): self.found = False; Thread.__init__(self) + def run(self): + for l in qpid_tool.stdout: + if pattern.search(l): self.found = True; return + scanner = Scanner() + scanner.start() + start = time.time() + try: + # Wait up to 5 second timeout for scanner to find expected output + while not scanner.found and time.time() < start + 5: + qpid_tool.stdin.write("list\n") # Ask qpid-tool to list + for b in cluster0: b.ready() # Raise if any brokers are down + finally: + qpid_tool.stdin.write("quit\n") + qpid_tool.wait() + scanner.join() + assert scanner.found + # Verify logs are consistent + cluster_test_logs.verify_logs(glob.glob("*.log")) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 0462838b1b..5e407a061f 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -272,5 +272,9 @@ <field name="product" type="str32"/> <field name="instance" type="str32"/> </control> + + <!-- Replicate encoded config objects - e.g. links and bridges. --> + <control name="config" code="0x37"><field name="encoded" type="str32"/></control> </class> + </amqp> |