diff options
author | Ted Ross <tross@apache.org> | 2008-10-16 19:45:14 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-10-16 19:45:14 +0000 |
commit | 879ef8cf4fa5d0c935d52d0fcb1d7e81929cf2d5 (patch) | |
tree | 10dfe6ac67860a559e9b66680ec4a2c8715691c5 /cpp/src | |
parent | 92b8daec32ca76cbfdd02558e45d41ff7373f6ef (diff) | |
download | qpid-python-879ef8cf4fa5d0c935d52d0fcb1d7e81929cf2d5.tar.gz |
QPID-1366 - implementation of automatic anti-looping for federation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705337 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 48 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionState.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Vhost.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Vhost.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.h | 1 | ||||
-rwxr-xr-x | cpp/src/tests/federation.py | 59 |
9 files changed, 119 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 65ed38b731..5064320efb 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -91,11 +91,21 @@ void Bridge::create(ConnectionState& c) string queue = "bridge_queue_"; queue += Uuid(true).str(); FieldTable queueSettings; + if (args.i_tag.size()) { queueSettings.setString("qpid.trace.id", args.i_tag); + } else { + const string& localTag = link->getBroker()->getFederationTag(); + if (localTag.size()) + queueSettings.setString("qpid.trace.id", localTag); } + if (args.i_excludes.size()) { queueSettings.setString("qpid.trace.exclude", args.i_excludes); + } else { + const string& peerTag = c.getFederationPeerTag(); + if (peerTag.size()) + queueSettings.setString("qpid.trace.exclude", peerTag); } bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index d401436d38..910c774958 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -37,6 +37,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/Uuid.h" #include "qpid/sys/ProtocolFactory.h" #include "qpid/sys/Poller.h" #include "qpid/sys/Dispatcher.h" @@ -136,7 +137,7 @@ Broker::Broker(const Broker::Options& conf) : managementAgentSingleton(!config.enableMgmt), store(0), acl(0), - dataDir(conf.noDataDir ? std::string () : conf.dataDir), + dataDir(conf.noDataDir ? std::string() : conf.dataDir), links(this), factory(new ConnectionFactory(*this)), dtxManager(timer), @@ -148,40 +149,43 @@ Broker::Broker(const Broker::Options& conf) : queueCleaner(queues, timer), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { - if(conf.enableMgmt){ + if (conf.enableMgmt) { QPID_LOG(info, "Management enabled"); managementAgent = managementAgentSingleton.getInstance(); ((ManagementBroker*) managementAgent)->configure - (dataDir.isEnabled () ? dataDir.getPath () : string (), + (dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPubInterval, this, conf.workerThreads + 3); - _qmf::Package packageInitializer (managementAgent); - - System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ()); - systemObject = System::shared_ptr (system); - - mgmtObject = new _qmf::Broker (managementAgent, this, system, conf.port); - mgmtObject->set_workerThreads (conf.workerThreads); - mgmtObject->set_maxConns (conf.maxConnections); - mgmtObject->set_connBacklog (conf.connectionBacklog); - mgmtObject->set_stagingThreshold (conf.stagingThreshold); - mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval); - mgmtObject->set_version (qpid::version); + _qmf::Package packageInitializer(managementAgent); + + System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string()); + systemObject = System::shared_ptr(system); + + mgmtObject = new _qmf::Broker(managementAgent, this, system, conf.port); + mgmtObject->set_workerThreads(conf.workerThreads); + mgmtObject->set_maxConns(conf.maxConnections); + mgmtObject->set_connBacklog(conf.connectionBacklog); + mgmtObject->set_stagingThreshold(conf.stagingThreshold); + mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval); + mgmtObject->set_version(qpid::version); if (dataDir.isEnabled()) mgmtObject->set_dataDir(dataDir.getPath()); else mgmtObject->clr_dataDir(); - managementAgent->addObject (mgmtObject, 0x1000000000000002LL); + managementAgent->addObject(mgmtObject, 0x1000000000000002LL); // Since there is currently no support for virtual hosts, a placeholder object // representing the implied single virtual host is added here to keep the // management schema correct. - Vhost* vhost = new Vhost (this); - vhostObject = Vhost::shared_ptr (vhost); - - queues.setParent (vhost); - exchanges.setParent (vhost); - links.setParent (vhost); + Vhost* vhost = new Vhost(this); + vhostObject = Vhost::shared_ptr(vhost); + framing::Uuid uuid(((ManagementBroker*) managementAgent)->getUuid()); + federationTag = uuid.str(); + vhostObject->setFederationTag(federationTag); + + queues.setParent(vhost); + exchanges.setParent(vhost); + links.setParent(vhost); } QueuePolicy::setDefaultMaxSize(conf.queueLimit); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index a7496f1510..cdb4c4a034 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -128,6 +128,7 @@ class Broker : public sys::Runnable, public Plugin::Target, std::vector<Url> knownBrokers; std::vector<Url> getKnownBrokersImpl(); + std::string federationTag; public: @@ -168,6 +169,7 @@ class Broker : public sys::Runnable, public Plugin::Target, Options& getOptions() { return config; } SessionManager& getSessionManager() { return sessionManager; } + const std::string& getFederationTag() const { return federationTag; } management::ManagementObject* GetManagementObject (void) const; management::Manageable* GetVhostObject (void) const; diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index c47037cf9c..dd1ac20dd2 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -42,6 +42,7 @@ const std::string ANONYMOUS = "ANONYMOUS"; const std::string PLAIN = "PLAIN"; const std::string en_US = "en_US"; const std::string QPID_FED_LINK = "qpid.fed_link"; +const std::string QPID_FED_TAG = "qpid.federation_tag"; } void ConnectionHandler::close(ReplyCode code, const string& text, ClassId, MethodId) @@ -83,6 +84,8 @@ ConnectionHandler::Handler::Handler(Connection& c, bool isClient) : FieldTable properties; Array mechanisms(0x95); + properties.setString(QPID_FED_TAG, connection.getBroker().getFederationTag()); + authenticator = SaslAuthenticator::createAuthenticator(c); authenticator->getMechanisms(mechanisms); @@ -104,12 +107,13 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProper { authenticator->start(mechanism, response); connection.setFederationLink(clientProperties.get(QPID_FED_LINK)); - if (connection.isFederationLink()){ + connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG)); + if (connection.isFederationLink()) { if (acl && !acl->authorise(connection.getUserId(),acl::CREATE,acl::LINK,"")){ - client.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link"); - return; - } - QPID_LOG(info, "Connection is a federation link"); + client.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link"); + return; + } + QPID_LOG(info, "Connection is a federation link"); } } @@ -154,15 +158,18 @@ void ConnectionHandler::Handler::closeOk(){ } -void ConnectionHandler::Handler::start(const FieldTable& /*serverProperties*/, +void ConnectionHandler::Handler::start(const FieldTable& serverProperties, const framing::Array& /*mechanisms*/, const framing::Array& /*locales*/) { string mechanism = connection.getAuthMechanism(); string response = connection.getAuthCredentials(); + connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG)); + FieldTable ft; ft.setInt(QPID_FED_LINK,1); + ft.setString(QPID_FED_TAG, connection.getBroker().getFederationTag()); server.startOk(ft, mechanism, response, en_US); } diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index c04bd46f72..fd69157dbd 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -68,6 +68,8 @@ class ConnectionState : public ConnectionToken, public management::Manageable void setFederationLink(bool b) { federationLink = b; } bool isFederationLink() const { return federationLink; } + void setFederationPeerTag(const string& tag) { federationPeerTag = string(tag); } + const string& getFederationPeerTag() const { return federationPeerTag; } Broker& getBroker() { return broker; } @@ -90,6 +92,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable string userId; string url; bool federationLink; + string federationPeerTag; }; }} diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp index 4d9d3bb604..c030d4c51f 100644 --- a/cpp/src/qpid/broker/Vhost.cpp +++ b/cpp/src/qpid/broker/Vhost.cpp @@ -38,3 +38,7 @@ Vhost::Vhost (management::Manageable* parentBroker) : mgmtObject(0) } } +void Vhost::setFederationTag(const std::string& tag) +{ + mgmtObject->set_federationTag(tag); +} diff --git a/cpp/src/qpid/broker/Vhost.h b/cpp/src/qpid/broker/Vhost.h index 59c0d4f959..ef59362e4d 100644 --- a/cpp/src/qpid/broker/Vhost.h +++ b/cpp/src/qpid/broker/Vhost.h @@ -41,6 +41,7 @@ class Vhost : public management::Manageable management::ManagementObject* GetManagementObject (void) const { return mgmtObject; } + void setFederationTag(const std::string& tag); }; }} diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h index 86aa31b8ed..3564d462df 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -67,6 +67,7 @@ public: bool dispatchCommand (qpid::broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); + const framing::Uuid& getUuid() const { return uuid; } // Stubs for remote management agent calls void init (std::string, uint16_t, uint16_t, bool, std::string) { assert(0); } diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index 7e7caeeec6..d9bafd9d88 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -245,6 +245,65 @@ class FederationTests(TestBase010): mgmt.shutdown () + def test_tracing_automatic(self): + remoteUrl = "%s:%d" % (remote_host(), remote_port()) + self.startQmf() + l_broker = self.qmf_broker + r_broker = self.qmf.addBroker(remoteUrl) + + l_brokerObj = self.qmf.getObjects(_class="broker", _broker=l_broker)[0] + r_brokerObj = self.qmf.getObjects(_class="broker", _broker=r_broker)[0] + + l_res = l_brokerObj.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + r_res = r_brokerObj.connect(testrunner.host, testrunner.port, False, "PLAIN", "guest", "guest", "tcp") + + self.assertEqual(l_res.status, 0) + self.assertEqual(r_res.status, 0) + + l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0] + r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0] + + l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False) + r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False) + + self.assertEqual(l_res.status, 0) + self.assertEqual(r_res.status, 0) + + count = 0 + while l_link.state != "Operational" or r_link.state != "Operational": + count += 1 + if count > 10: + self.fail("Fed links didn't become operational after 10 seconds") + sleep(1) + l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0] + r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0] + sleep(3) + + #setup queue to receive messages from local broker + session = self.session + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="amq.direct", binding_key="key") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + #setup queue on remote broker and add some messages + r_conn = self.connect(host=remote_host(), port=remote_port()) + r_session = r_conn.session("test_trace") + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="key") + r_session.message_transfer(destination="amq.direct", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + try: + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + except Empty: + self.fail("Failed to find expected message containing 'Message %d'" % i) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + def test_tracing(self): session = self.session |