diff options
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 27 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.h | 6 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_test_logs.py | 3 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 40 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 4 |
12 files changed, 135 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 5911d916ad..7fbbf4e2c4 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -60,8 +60,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, (agent, this, link, id, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); - if (!args.i_durable) - agent->addObject(mgmtObject); + agent->addObject(mgmtObject); } QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest); } @@ -167,10 +166,6 @@ void Bridge::destroy() void Bridge::setPersistenceId(uint64_t pId) const { - if (mgmtObject != 0 && persistenceId == 0) { - ManagementAgent* agent = link->getBroker()->getManagementAgent(); - agent->addObject (mgmtObject, pId); - } persistenceId = pId; } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 5a50d26c8c..e1091df724 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -30,6 +30,7 @@ #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/AclModule.h" +#include "qpid/sys/ClusterSafe.h" using namespace qpid::broker; using qpid::framing::Buffer; @@ -130,9 +131,12 @@ void Link::established () { stringstream addr; addr << host << ":" << port; - QPID_LOG (info, "Inter-broker link established to " << addr.str()); - agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); + + // Don't raise the management event in a cluster, other members wont't get this call. + if (!sys::isCluster()) + agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); + { Mutex::ScopedLock mutex(lock); setStateLH(STATE_OPERATIONAL); @@ -150,11 +154,13 @@ void Link::closed (int, std::string text) connection = 0; + // Don't raise the management event in a cluster, other members wont't get this call. if (state == STATE_OPERATIONAL) { stringstream addr; addr << host << ":" << port; QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str()); - agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); + if (!sys::isCluster()) + agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); } for (Bridges::iterator i = active.begin(); i != active.end(); i++) { diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index ea14552cc1..82f1f0ea24 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -379,3 +379,12 @@ void LinkRegistry::setPassive(bool p) passive = p; //will activate or passivate links on maintenance visit } + +void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) { + for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second); +} + +void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) { + for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second); +} + diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index a1931920d7..4c97e4f9d8 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -31,6 +31,7 @@ #include "qpid/management/Manageable.h" #include <boost/shared_ptr.hpp> #include <boost/intrusive_ptr.hpp> +#include <boost/function.hpp> namespace qpid { namespace broker { @@ -148,6 +149,12 @@ namespace broker { * bridges won't therefore pull or push any messages. */ void setPassive(bool); + + + /** Iterate over each link in the registry. Used for cluster updates. */ + void eachLink(boost::function<void(boost::shared_ptr<Link>)> f); + /** Iterate over each bridge in the registry. Used for cluster updates. */ + void eachBridge(boost::function<void(boost::shared_ptr< Bridge>)> f); }; } } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 5720f7fcc1..dd4882774b 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1045272; +const uint32_t Cluster::CLUSTER_VERSION = 1058747; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 2797fdcf02..c7689577a7 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -32,6 +32,8 @@ #include "qpid/broker/RecoveredEnqueue.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/Bridge.h" #include "qpid/broker/Queue.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" @@ -346,13 +348,12 @@ size_t Connection::decode(const char* data, size_t size) { // returns true if the header is complete or already read. bool Connection::checkProtocolHeader(const char*& data, size_t size) { if (expectProtocolHeader) { - //If this is an outgoing link, we will receive a protocol - //header which needs to be decoded first + // This is an outgoing link connection, we will receive a protocol + // header which needs to be decoded first framing::ProtocolInitiation pi; Buffer buf(const_cast<char*&>(data), size); if (pi.decode(buf)) { //TODO: check the version is correct - QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")"); expectProtocolHeader = false; data += pi.encodedSize(); } else { @@ -650,5 +651,25 @@ void Connection::managementSetupState( agent->setUuid(id); agent->setName(vendor, product, instance); } + +void Connection::config(const std::string& encoded) { + Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); + string kind; + buf.getShortString (kind); + if (kind == "link") { + broker::Link::shared_ptr link = + broker::Link::decode(cluster.getBroker().getLinks(), buf); + QPID_LOG(debug, cluster << " updated link " + << link->getHost() << ":" << link->getPort()); + } + else if (kind == "bridge") { + broker::Bridge::shared_ptr bridge = + broker::Bridge::decode(cluster.getBroker().getLinks(), buf); + QPID_LOG(debug, cluster << " updated bridge " << bridge->getName()); + } + else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); +} + + }} // Namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 69b8cb1450..d90cdd898b 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -183,7 +183,9 @@ class Connection : const std::string& vendor, const std::string& product, const std::string& instance); - + + void config(const std::string& encoded); + void setSecureConnection ( broker::SecureConnection * sc ); private: diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 59db4de526..e5d20c85e6 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -34,6 +34,9 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/LinkRegistry.h" +#include "qpid/broker/Bridge.h" +#include "qpid/broker/Link.h" #include "qpid/broker/Message.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/ExchangeRegistry.h" @@ -167,7 +170,7 @@ void UpdateClient::update() { b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); ClusterConnectionProxy(session).expiryId(expiry.getId()); - + updateLinks(); updateManagementAgent(); session.close(); @@ -199,6 +202,14 @@ template <class T> std::string encode(const T& t) { t.encode(buf); return encoded; } + +template <class T> std::string encode(const T& t, bool encodeKind) { + std::string encoded; + encoded.resize(t.encodedSize()); + framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); + t.encode(buf, encodeKind); + return encoded; +} } // namespace @@ -583,4 +594,21 @@ void UpdateClient::updateQueueListener(std::string& q, ClusterConnectionProxy(session).addQueueListener(q, n); } +void UpdateClient::updateLinks() { + broker::LinkRegistry& links = updaterBroker.getLinks(); + links.eachLink(boost::bind(&UpdateClient::updateLink, this, _1)); + links.eachBridge(boost::bind(&UpdateClient::updateBridge, this, _1)); +} + +void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) { + QPID_LOG(debug, *this << " updating link " + << link->getHost() << ":" << link->getPort()); + ClusterConnectionProxy(session).config(encode(*link)); +} + +void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) { + QPID_LOG(debug, *this << " updating bridge " << bridge->getName()); + ClusterConnectionProxy(session).config(encode(*bridge)); +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index 76621cd7ba..156fa112df 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -49,6 +49,8 @@ class DeliveryRecord; class SessionState; class SemanticState; class Decoder; +class Link; +class Bridge; } // namespace broker @@ -99,6 +101,10 @@ class UpdateClient : public sys::Runnable { void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c); void updateManagementSetupState(); void updateManagementAgent(); + void updateLinks(); + void updateLink(const boost::shared_ptr<broker::Link>&); + void updateBridge(const boost::shared_ptr<broker::Bridge>&); + Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering; MemberId updaterId; diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py index 261b1d522b..eae28fc4e5 100755 --- a/qpid/cpp/src/tests/cluster_test_logs.py +++ b/qpid/cpp/src/tests/cluster_test_logs.py @@ -58,7 +58,8 @@ def filter_log(log): 'warning Broker closed connection: 200, OK', 'task late', 'task overran', - 'warning CLOSING .* unsent data' + 'warning CLOSING .* unsent data', + 'Inter-broker link ' ]) if re.compile(skip).search(l): continue diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 8bc89b2292..9bfd1b2d89 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -26,6 +26,7 @@ from qpid.messaging import Message, Empty from threading import Thread, Lock from logging import getLogger from itertools import chain +from tempfile import NamedTemporaryFile log = getLogger("qpid.cluster_tests") @@ -264,6 +265,45 @@ acl allow all all cluster.start() self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0) + def test_route_update(self): + """Regression test for https://issues.apache.org/jira/browse/QPID-2982 + Links and bridges associated with routes were not replicated on update. + This meant extra management objects and caused an exit if a management + client was attached. + """ + args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] + cluster0 = self.cluster(1, args=args) + cluster1 = self.cluster(1, args=args) + assert 0 == subprocess.call( + ["qpid-route", "route", "add", cluster0[0].host_port(), + cluster1[0].host_port(), "dummy-exchange", "dummy-key", "-d"]) + cluster0.start() + + # Wait for qpid-tool:list on cluster0[0] to generate expected output. + pattern = re.compile("org.apache.qpid.broker.*link") + qpid_tool = subprocess.Popen(["qpid-tool", cluster0[0].host_port()], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + class Scanner(Thread): + def __init__(self): self.found = False; Thread.__init__(self) + def run(self): + for l in qpid_tool.stdout: + if pattern.search(l): self.found = True; return + scanner = Scanner() + scanner.start() + start = time.time() + try: + # Wait up to 5 second timeout for scanner to find expected output + while not scanner.found and time.time() < start + 5: + qpid_tool.stdin.write("list\n") # Ask qpid-tool to list + for b in cluster0: b.ready() # Raise if any brokers are down + finally: + qpid_tool.stdin.write("quit\n") + qpid_tool.wait() + scanner.join() + assert scanner.found + # Verify logs are consistent + cluster_test_logs.verify_logs(glob.glob("*.log")) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 0462838b1b..5e407a061f 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -272,5 +272,9 @@ <field name="product" type="str32"/> <field name="instance" type="str32"/> </control> + + <!-- Replicate encoded config objects - e.g. links and bridges. --> + <control name="config" code="0x37"><field name="encoded" type="str32"/></control> </class> + </amqp> |