summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp27
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp30
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h6
-rwxr-xr-xqpid/cpp/src/tests/cluster_test_logs.py3
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py40
-rw-r--r--qpid/cpp/xml/cluster.xml4
12 files changed, 135 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 5911d916ad..7fbbf4e2c4 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -60,8 +60,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
(agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
- if (!args.i_durable)
- agent->addObject(mgmtObject);
+ agent->addObject(mgmtObject);
}
QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
}
@@ -167,10 +166,6 @@ void Bridge::destroy()
void Bridge::setPersistenceId(uint64_t pId) const
{
- if (mgmtObject != 0 && persistenceId == 0) {
- ManagementAgent* agent = link->getBroker()->getManagementAgent();
- agent->addObject (mgmtObject, pId);
- }
persistenceId = pId;
}
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 5a50d26c8c..e1091df724 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -30,6 +30,7 @@
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/sys/ClusterSafe.h"
using namespace qpid::broker;
using qpid::framing::Buffer;
@@ -130,9 +131,12 @@ void Link::established ()
{
stringstream addr;
addr << host << ":" << port;
-
QPID_LOG (info, "Inter-broker link established to " << addr.str());
- agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
+
+ // Don't raise the management event in a cluster, other members wont't get this call.
+ if (!sys::isCluster())
+ agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
+
{
Mutex::ScopedLock mutex(lock);
setStateLH(STATE_OPERATIONAL);
@@ -150,11 +154,13 @@ void Link::closed (int, std::string text)
connection = 0;
+ // Don't raise the management event in a cluster, other members wont't get this call.
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
- agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
+ if (!sys::isCluster())
+ agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index ea14552cc1..82f1f0ea24 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -379,3 +379,12 @@ void LinkRegistry::setPassive(bool p)
passive = p;
//will activate or passivate links on maintenance visit
}
+
+void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
+ for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
+}
+
+void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) {
+ for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
+}
+
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index a1931920d7..4c97e4f9d8 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -31,6 +31,7 @@
#include "qpid/management/Manageable.h"
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
+#include <boost/function.hpp>
namespace qpid {
namespace broker {
@@ -148,6 +149,12 @@ namespace broker {
* bridges won't therefore pull or push any messages.
*/
void setPassive(bool);
+
+
+ /** Iterate over each link in the registry. Used for cluster updates. */
+ void eachLink(boost::function<void(boost::shared_ptr<Link>)> f);
+ /** Iterate over each bridge in the registry. Used for cluster updates. */
+ void eachBridge(boost::function<void(boost::shared_ptr< Bridge>)> f);
};
}
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 5720f7fcc1..dd4882774b 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1045272;
+const uint32_t Cluster::CLUSTER_VERSION = 1058747;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 2797fdcf02..c7689577a7 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -32,6 +32,8 @@
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Bridge.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
@@ -346,13 +348,12 @@ size_t Connection::decode(const char* data, size_t size) {
// returns true if the header is complete or already read.
bool Connection::checkProtocolHeader(const char*& data, size_t size) {
if (expectProtocolHeader) {
- //If this is an outgoing link, we will receive a protocol
- //header which needs to be decoded first
+ // This is an outgoing link connection, we will receive a protocol
+ // header which needs to be decoded first
framing::ProtocolInitiation pi;
Buffer buf(const_cast<char*&>(data), size);
if (pi.decode(buf)) {
//TODO: check the version is correct
- QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
expectProtocolHeader = false;
data += pi.encodedSize();
} else {
@@ -650,5 +651,25 @@ void Connection::managementSetupState(
agent->setUuid(id);
agent->setName(vendor, product, instance);
}
+
+void Connection::config(const std::string& encoded) {
+ Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ string kind;
+ buf.getShortString (kind);
+ if (kind == "link") {
+ broker::Link::shared_ptr link =
+ broker::Link::decode(cluster.getBroker().getLinks(), buf);
+ QPID_LOG(debug, cluster << " updated link "
+ << link->getHost() << ":" << link->getPort());
+ }
+ else if (kind == "bridge") {
+ broker::Bridge::shared_ptr bridge =
+ broker::Bridge::decode(cluster.getBroker().getLinks(), buf);
+ QPID_LOG(debug, cluster << " updated bridge " << bridge->getName());
+ }
+ else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
+}
+
+
}} // Namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 69b8cb1450..d90cdd898b 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -183,7 +183,9 @@ class Connection :
const std::string& vendor,
const std::string& product,
const std::string& instance);
-
+
+ void config(const std::string& encoded);
+
void setSecureConnection ( broker::SecureConnection * sc );
private:
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 59db4de526..e5d20c85e6 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -34,6 +34,9 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/LinkRegistry.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Link.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -167,7 +170,7 @@ void UpdateClient::update() {
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
ClusterConnectionProxy(session).expiryId(expiry.getId());
-
+ updateLinks();
updateManagementAgent();
session.close();
@@ -199,6 +202,14 @@ template <class T> std::string encode(const T& t) {
t.encode(buf);
return encoded;
}
+
+template <class T> std::string encode(const T& t, bool encodeKind) {
+ std::string encoded;
+ encoded.resize(t.encodedSize());
+ framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ t.encode(buf, encodeKind);
+ return encoded;
+}
} // namespace
@@ -583,4 +594,21 @@ void UpdateClient::updateQueueListener(std::string& q,
ClusterConnectionProxy(session).addQueueListener(q, n);
}
+void UpdateClient::updateLinks() {
+ broker::LinkRegistry& links = updaterBroker.getLinks();
+ links.eachLink(boost::bind(&UpdateClient::updateLink, this, _1));
+ links.eachBridge(boost::bind(&UpdateClient::updateBridge, this, _1));
+}
+
+void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) {
+ QPID_LOG(debug, *this << " updating link "
+ << link->getHost() << ":" << link->getPort());
+ ClusterConnectionProxy(session).config(encode(*link));
+}
+
+void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) {
+ QPID_LOG(debug, *this << " updating bridge " << bridge->getName());
+ ClusterConnectionProxy(session).config(encode(*bridge));
+}
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index 76621cd7ba..156fa112df 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -49,6 +49,8 @@ class DeliveryRecord;
class SessionState;
class SemanticState;
class Decoder;
+class Link;
+class Bridge;
} // namespace broker
@@ -99,6 +101,10 @@ class UpdateClient : public sys::Runnable {
void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c);
void updateManagementSetupState();
void updateManagementAgent();
+ void updateLinks();
+ void updateLink(const boost::shared_ptr<broker::Link>&);
+ void updateBridge(const boost::shared_ptr<broker::Bridge>&);
+
Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering;
MemberId updaterId;
diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py
index 261b1d522b..eae28fc4e5 100755
--- a/qpid/cpp/src/tests/cluster_test_logs.py
+++ b/qpid/cpp/src/tests/cluster_test_logs.py
@@ -58,7 +58,8 @@ def filter_log(log):
'warning Broker closed connection: 200, OK',
'task late',
'task overran',
- 'warning CLOSING .* unsent data'
+ 'warning CLOSING .* unsent data',
+ 'Inter-broker link '
])
if re.compile(skip).search(l): continue
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 8bc89b2292..9bfd1b2d89 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -26,6 +26,7 @@ from qpid.messaging import Message, Empty
from threading import Thread, Lock
from logging import getLogger
from itertools import chain
+from tempfile import NamedTemporaryFile
log = getLogger("qpid.cluster_tests")
@@ -264,6 +265,45 @@ acl allow all all
cluster.start()
self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0)
+ def test_route_update(self):
+ """Regression test for https://issues.apache.org/jira/browse/QPID-2982
+ Links and bridges associated with routes were not replicated on update.
+ This meant extra management objects and caused an exit if a management
+ client was attached.
+ """
+ args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+ cluster0 = self.cluster(1, args=args)
+ cluster1 = self.cluster(1, args=args)
+ assert 0 == subprocess.call(
+ ["qpid-route", "route", "add", cluster0[0].host_port(),
+ cluster1[0].host_port(), "dummy-exchange", "dummy-key", "-d"])
+ cluster0.start()
+
+ # Wait for qpid-tool:list on cluster0[0] to generate expected output.
+ pattern = re.compile("org.apache.qpid.broker.*link")
+ qpid_tool = subprocess.Popen(["qpid-tool", cluster0[0].host_port()],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ class Scanner(Thread):
+ def __init__(self): self.found = False; Thread.__init__(self)
+ def run(self):
+ for l in qpid_tool.stdout:
+ if pattern.search(l): self.found = True; return
+ scanner = Scanner()
+ scanner.start()
+ start = time.time()
+ try:
+ # Wait up to 5 second timeout for scanner to find expected output
+ while not scanner.found and time.time() < start + 5:
+ qpid_tool.stdin.write("list\n") # Ask qpid-tool to list
+ for b in cluster0: b.ready() # Raise if any brokers are down
+ finally:
+ qpid_tool.stdin.write("quit\n")
+ qpid_tool.wait()
+ scanner.join()
+ assert scanner.found
+ # Verify logs are consistent
+ cluster_test_logs.verify_logs(glob.glob("*.log"))
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 0462838b1b..5e407a061f 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -272,5 +272,9 @@
<field name="product" type="str32"/>
<field name="instance" type="str32"/>
</control>
+
+ <!-- Replicate encoded config objects - e.g. links and bridges. -->
+ <control name="config" code="0x37"><field name="encoded" type="str32"/></control>
</class>
+
</amqp>