summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-10-16 19:45:14 +0000
committerTed Ross <tross@apache.org>2008-10-16 19:45:14 +0000
commit879ef8cf4fa5d0c935d52d0fcb1d7e81929cf2d5 (patch)
tree10dfe6ac67860a559e9b66680ec4a2c8715691c5 /cpp/src
parent92b8daec32ca76cbfdd02558e45d41ff7373f6ef (diff)
downloadqpid-python-879ef8cf4fa5d0c935d52d0fcb1d7e81929cf2d5.tar.gz
QPID-1366 - implementation of automatic anti-looping for federation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705337 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp10
-rw-r--r--cpp/src/qpid/broker/Broker.cpp48
-rw-r--r--cpp/src/qpid/broker/Broker.h2
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp19
-rw-r--r--cpp/src/qpid/broker/ConnectionState.h3
-rw-r--r--cpp/src/qpid/broker/Vhost.cpp4
-rw-r--r--cpp/src/qpid/broker/Vhost.h1
-rw-r--r--cpp/src/qpid/management/ManagementBroker.h1
-rwxr-xr-xcpp/src/tests/federation.py59
9 files changed, 119 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 65ed38b731..5064320efb 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -91,11 +91,21 @@ void Bridge::create(ConnectionState& c)
string queue = "bridge_queue_";
queue += Uuid(true).str();
FieldTable queueSettings;
+
if (args.i_tag.size()) {
queueSettings.setString("qpid.trace.id", args.i_tag);
+ } else {
+ const string& localTag = link->getBroker()->getFederationTag();
+ if (localTag.size())
+ queueSettings.setString("qpid.trace.id", localTag);
}
+
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+ } else {
+ const string& peerTag = c.getFederationPeerTag();
+ if (peerTag.size())
+ queueSettings.setString("qpid.trace.exclude", peerTag);
}
bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues?
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index d401436d38..910c774958 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -37,6 +37,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/sys/ProtocolFactory.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/Dispatcher.h"
@@ -136,7 +137,7 @@ Broker::Broker(const Broker::Options& conf) :
managementAgentSingleton(!config.enableMgmt),
store(0),
acl(0),
- dataDir(conf.noDataDir ? std::string () : conf.dataDir),
+ dataDir(conf.noDataDir ? std::string() : conf.dataDir),
links(this),
factory(new ConnectionFactory(*this)),
dtxManager(timer),
@@ -148,40 +149,43 @@ Broker::Broker(const Broker::Options& conf) :
queueCleaner(queues, timer),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
- if(conf.enableMgmt){
+ if (conf.enableMgmt) {
QPID_LOG(info, "Management enabled");
managementAgent = managementAgentSingleton.getInstance();
((ManagementBroker*) managementAgent)->configure
- (dataDir.isEnabled () ? dataDir.getPath () : string (),
+ (dataDir.isEnabled() ? dataDir.getPath() : string(),
conf.mgmtPubInterval, this, conf.workerThreads + 3);
- _qmf::Package packageInitializer (managementAgent);
-
- System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ());
- systemObject = System::shared_ptr (system);
-
- mgmtObject = new _qmf::Broker (managementAgent, this, system, conf.port);
- mgmtObject->set_workerThreads (conf.workerThreads);
- mgmtObject->set_maxConns (conf.maxConnections);
- mgmtObject->set_connBacklog (conf.connectionBacklog);
- mgmtObject->set_stagingThreshold (conf.stagingThreshold);
- mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval);
- mgmtObject->set_version (qpid::version);
+ _qmf::Package packageInitializer(managementAgent);
+
+ System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string());
+ systemObject = System::shared_ptr(system);
+
+ mgmtObject = new _qmf::Broker(managementAgent, this, system, conf.port);
+ mgmtObject->set_workerThreads(conf.workerThreads);
+ mgmtObject->set_maxConns(conf.maxConnections);
+ mgmtObject->set_connBacklog(conf.connectionBacklog);
+ mgmtObject->set_stagingThreshold(conf.stagingThreshold);
+ mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval);
+ mgmtObject->set_version(qpid::version);
if (dataDir.isEnabled())
mgmtObject->set_dataDir(dataDir.getPath());
else
mgmtObject->clr_dataDir();
- managementAgent->addObject (mgmtObject, 0x1000000000000002LL);
+ managementAgent->addObject(mgmtObject, 0x1000000000000002LL);
// Since there is currently no support for virtual hosts, a placeholder object
// representing the implied single virtual host is added here to keep the
// management schema correct.
- Vhost* vhost = new Vhost (this);
- vhostObject = Vhost::shared_ptr (vhost);
-
- queues.setParent (vhost);
- exchanges.setParent (vhost);
- links.setParent (vhost);
+ Vhost* vhost = new Vhost(this);
+ vhostObject = Vhost::shared_ptr(vhost);
+ framing::Uuid uuid(((ManagementBroker*) managementAgent)->getUuid());
+ federationTag = uuid.str();
+ vhostObject->setFederationTag(federationTag);
+
+ queues.setParent(vhost);
+ exchanges.setParent(vhost);
+ links.setParent(vhost);
}
QueuePolicy::setDefaultMaxSize(conf.queueLimit);
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index a7496f1510..cdb4c4a034 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -128,6 +128,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
std::vector<Url> knownBrokers;
std::vector<Url> getKnownBrokersImpl();
+ std::string federationTag;
public:
@@ -168,6 +169,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
Options& getOptions() { return config; }
SessionManager& getSessionManager() { return sessionManager; }
+ const std::string& getFederationTag() const { return federationTag; }
management::ManagementObject* GetManagementObject (void) const;
management::Manageable* GetVhostObject (void) const;
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index c47037cf9c..dd1ac20dd2 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -42,6 +42,7 @@ const std::string ANONYMOUS = "ANONYMOUS";
const std::string PLAIN = "PLAIN";
const std::string en_US = "en_US";
const std::string QPID_FED_LINK = "qpid.fed_link";
+const std::string QPID_FED_TAG = "qpid.federation_tag";
}
void ConnectionHandler::close(ReplyCode code, const string& text, ClassId, MethodId)
@@ -83,6 +84,8 @@ ConnectionHandler::Handler::Handler(Connection& c, bool isClient) :
FieldTable properties;
Array mechanisms(0x95);
+ properties.setString(QPID_FED_TAG, connection.getBroker().getFederationTag());
+
authenticator = SaslAuthenticator::createAuthenticator(c);
authenticator->getMechanisms(mechanisms);
@@ -104,12 +107,13 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProper
{
authenticator->start(mechanism, response);
connection.setFederationLink(clientProperties.get(QPID_FED_LINK));
- if (connection.isFederationLink()){
+ connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
+ if (connection.isFederationLink()) {
if (acl && !acl->authorise(connection.getUserId(),acl::CREATE,acl::LINK,"")){
- client.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link");
- return;
- }
- QPID_LOG(info, "Connection is a federation link");
+ client.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link");
+ return;
+ }
+ QPID_LOG(info, "Connection is a federation link");
}
}
@@ -154,15 +158,18 @@ void ConnectionHandler::Handler::closeOk(){
}
-void ConnectionHandler::Handler::start(const FieldTable& /*serverProperties*/,
+void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
const framing::Array& /*mechanisms*/,
const framing::Array& /*locales*/)
{
string mechanism = connection.getAuthMechanism();
string response = connection.getAuthCredentials();
+ connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG));
+
FieldTable ft;
ft.setInt(QPID_FED_LINK,1);
+ ft.setString(QPID_FED_TAG, connection.getBroker().getFederationTag());
server.startOk(ft, mechanism, response, en_US);
}
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h
index c04bd46f72..fd69157dbd 100644
--- a/cpp/src/qpid/broker/ConnectionState.h
+++ b/cpp/src/qpid/broker/ConnectionState.h
@@ -68,6 +68,8 @@ class ConnectionState : public ConnectionToken, public management::Manageable
void setFederationLink(bool b) { federationLink = b; }
bool isFederationLink() const { return federationLink; }
+ void setFederationPeerTag(const string& tag) { federationPeerTag = string(tag); }
+ const string& getFederationPeerTag() const { return federationPeerTag; }
Broker& getBroker() { return broker; }
@@ -90,6 +92,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
string userId;
string url;
bool federationLink;
+ string federationPeerTag;
};
}}
diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp
index 4d9d3bb604..c030d4c51f 100644
--- a/cpp/src/qpid/broker/Vhost.cpp
+++ b/cpp/src/qpid/broker/Vhost.cpp
@@ -38,3 +38,7 @@ Vhost::Vhost (management::Manageable* parentBroker) : mgmtObject(0)
}
}
+void Vhost::setFederationTag(const std::string& tag)
+{
+ mgmtObject->set_federationTag(tag);
+}
diff --git a/cpp/src/qpid/broker/Vhost.h b/cpp/src/qpid/broker/Vhost.h
index 59c0d4f959..ef59362e4d 100644
--- a/cpp/src/qpid/broker/Vhost.h
+++ b/cpp/src/qpid/broker/Vhost.h
@@ -41,6 +41,7 @@ class Vhost : public management::Manageable
management::ManagementObject* GetManagementObject (void) const
{ return mgmtObject; }
+ void setFederationTag(const std::string& tag);
};
}}
diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h
index 86aa31b8ed..3564d462df 100644
--- a/cpp/src/qpid/management/ManagementBroker.h
+++ b/cpp/src/qpid/management/ManagementBroker.h
@@ -67,6 +67,7 @@ public:
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args);
+ const framing::Uuid& getUuid() const { return uuid; }
// Stubs for remote management agent calls
void init (std::string, uint16_t, uint16_t, bool, std::string) { assert(0); }
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
index 7e7caeeec6..d9bafd9d88 100755
--- a/cpp/src/tests/federation.py
+++ b/cpp/src/tests/federation.py
@@ -245,6 +245,65 @@ class FederationTests(TestBase010):
mgmt.shutdown ()
+ def test_tracing_automatic(self):
+ remoteUrl = "%s:%d" % (remote_host(), remote_port())
+ self.startQmf()
+ l_broker = self.qmf_broker
+ r_broker = self.qmf.addBroker(remoteUrl)
+
+ l_brokerObj = self.qmf.getObjects(_class="broker", _broker=l_broker)[0]
+ r_brokerObj = self.qmf.getObjects(_class="broker", _broker=r_broker)[0]
+
+ l_res = l_brokerObj.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ r_res = r_brokerObj.connect(testrunner.host, testrunner.port, False, "PLAIN", "guest", "guest", "tcp")
+
+ self.assertEqual(l_res.status, 0)
+ self.assertEqual(r_res.status, 0)
+
+ l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0]
+ r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0]
+
+ l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False)
+ r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False)
+
+ self.assertEqual(l_res.status, 0)
+ self.assertEqual(r_res.status, 0)
+
+ count = 0
+ while l_link.state != "Operational" or r_link.state != "Operational":
+ count += 1
+ if count > 10:
+ self.fail("Fed links didn't become operational after 10 seconds")
+ sleep(1)
+ l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0]
+ r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0]
+ sleep(3)
+
+ #setup queue to receive messages from local broker
+ session = self.session
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="amq.direct", binding_key="key")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ #setup queue on remote broker and add some messages
+ r_conn = self.connect(host=remote_host(), port=remote_port())
+ r_session = r_conn.session("test_trace")
+ for i in range(1, 11):
+ dp = r_session.delivery_properties(routing_key="key")
+ r_session.message_transfer(destination="amq.direct", message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ try:
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ except Empty:
+ self.fail("Failed to find expected message containing 'Message %d'" % i)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
def test_tracing(self):
session = self.session