From 54d9e7a6228bb1bd291a9aa4c156126359a6dbf0 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 25 Jan 2012 18:48:09 +0000 Subject: QPID-3603: HA backup rejects client connections. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1235867 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/Makefile.am | 2 + qpid/cpp/src/ha.mk | 3 +- qpid/cpp/src/qpid/broker/Broker.h | 3 ++ qpid/cpp/src/qpid/broker/Connection.cpp | 6 ++- qpid/cpp/src/qpid/broker/ConnectionObserver.h | 52 +++++++++++++++++++ qpid/cpp/src/qpid/broker/ConnectionObservers.h | 56 +++++++++++++++++++++ qpid/cpp/src/qpid/ha/ConnectionExcluder.h | 70 ++++++++++++++++++++++++++ qpid/cpp/src/qpid/ha/HaBroker.cpp | 16 ++++-- qpid/cpp/src/qpid/ha/HaBroker.h | 1 + qpid/cpp/src/qpid/ha/HaPlugin.cpp | 1 + qpid/cpp/src/qpid/ha/Settings.h | 3 +- qpid/cpp/src/tests/ha_tests.py | 32 +++++++++--- 12 files changed, 231 insertions(+), 14 deletions(-) create mode 100644 qpid/cpp/src/qpid/broker/ConnectionObserver.h create mode 100644 qpid/cpp/src/qpid/broker/ConnectionObservers.h create mode 100644 qpid/cpp/src/qpid/ha/ConnectionExcluder.h diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 5caf54e2ba..666f593f7d 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -541,6 +541,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Credit.h \ qpid/broker/Credit.cpp \ qpid/broker/ConsumerFactory.h \ + qpid/broker/ConnectionObserver.h \ + qpid/broker/ConnectionObservers.h \ qpid/broker/Daemon.cpp \ qpid/broker/Daemon.h \ qpid/broker/Deliverable.h \ diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index 272fdbf296..6874449cc5 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -34,7 +34,8 @@ ha_la_SOURCES = \ qpid/ha/ReplicatingSubscription.h \ qpid/ha/ReplicatingSubscription.cpp \ qpid/ha/BrokerReplicator.cpp \ - qpid/ha/BrokerReplicator.h + qpid/ha/BrokerReplicator.h \ + qpid/ha/ConnectionExcluder.h ha_la_LIBADD = libqpidbroker.la ha_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 11cf81ea9e..5d4cc99e66 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -38,6 +38,7 @@ #include "qpid/broker/System.h" #include "qpid/broker/ExpiryPolicy.h" #include "qpid/broker/ConsumerFactory.h" +#include "qpid/broker/ConnectionObservers.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -200,6 +201,7 @@ public: boost::intrusive_ptr expiryPolicy; ConnectionCounter connectionCounter; ConsumerFactories consumerFactories; + ConnectionObservers connectionObservers; public: virtual ~Broker(); @@ -360,6 +362,7 @@ public: const std::string& connectionId); ConsumerFactories& getConsumerFactories() { return consumerFactories; } + ConnectionObservers& getConnectionObservers() { return connectionObservers; } }; }} diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 73584be8fd..97e8e8ca13 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/Connection.h" +#include "qpid/broker/ConnectionObserver.h" #include "qpid/broker/SessionOutputException.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/Bridge.h" @@ -162,8 +163,11 @@ void Connection::received(framing::AMQFrame& frame) { recordFromServer(frame); else recordFromClient(frame); - if (!wasOpen && isOpen()) + if (!wasOpen && isOpen()) { doIoCallbacks(); // Do any callbacks registered before we opened. + // FIXME aconway 2012-01-18: generic observer points. + broker.getConnectionObservers().connect(*this); + } } void Connection::sent(const framing::AMQFrame& frame) diff --git a/qpid/cpp/src/qpid/broker/ConnectionObserver.h b/qpid/cpp/src/qpid/broker/ConnectionObserver.h new file mode 100644 index 0000000000..e59ec261bc --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ConnectionObserver.h @@ -0,0 +1,52 @@ +#ifndef QPID_BROKER_CONNECTIONOBSERVER_H +#define QPID_BROKER_CONNECTIONOBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qpid { +namespace broker { + +class Connection; + +/** + * Observer that is informed of connection events. For use by + * plug-ins that want to be notified of, or influence, connection + * events. + */ +class ConnectionObserver +{ + public: + virtual ~ConnectionObserver() {} + + /** Called when a connection is opened and authentication has been + * performed. + * @exception Throwing an exception will abort the connection. + */ + virtual void connect(Connection& connection) = 0; + + /** Called when a connection is torn down. */ + virtual void disconnect(Connection& connection) = 0; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CONNECTIONOBSERVER_H*/ diff --git a/qpid/cpp/src/qpid/broker/ConnectionObservers.h b/qpid/cpp/src/qpid/broker/ConnectionObservers.h new file mode 100644 index 0000000000..bef40e26b4 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ConnectionObservers.h @@ -0,0 +1,56 @@ +#ifndef QPID_BROKER_CONNECTIONOBSERVERS_H +#define QPID_BROKER_CONNECTIONOBSERVERS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ConnectionObserver.h" + +namespace qpid { +namespace broker { + +/** + * A collection of connection observers. + * Calling a ConnectionObserver function will call that function on each observer. + */ +class ConnectionObservers : public ConnectionObserver { + public: + // functions for managing the collection of observers + void add(boost::shared_ptr observer) { + observers.push_back(observer); + } + + // implementation of ConnectionObserver interface + void connect(Connection& c) { + std::for_each(observers.begin(), observers.end(), boost::bind(&ConnectionObserver::connect, _1, boost::ref(c))); + } + void disconnect(Connection& c) { + std::for_each(observers.begin(), observers.end(), boost::bind(&ConnectionObserver::disconnect, _1, boost::ref(c))); + } + + private: + typedef std::vector > Observers; + Observers observers; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CONNECTIONOBSERVERS_H*/ diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h new file mode 100644 index 0000000000..4a878c81c3 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h @@ -0,0 +1,70 @@ +#ifndef QPID_HA_CONNECTIONEXCLUDER_H +#define QPID_HA_CONNECTIONEXCLUDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/ConnectionObserver.h" +#include "qpid/broker/Connection.h" +#include +#include + +namespace qpid { +namespace ha { + +/** + * Exclude normal connections to a backup broker. + * Connections as ha-admin user are allowed. + */ +class ConnectionExcluder : public broker::ConnectionObserver +{ + public: + typedef boost::function PrimaryTest; + + ConnectionExcluder(string adminUser_, PrimaryTest isPrimary_) + : adminUser(adminUser_), isPrimary(isPrimary_) {} + + void connect(broker::Connection& connection) { + if (!isPrimary() && !connection.isLink() + && !connection.isAuthenticatedUser(adminUser)) + { + throw Exception( + QPID_MSG( + "HA: Backup broker rejected connection " + << connection.getMgmtId() << " by user " << connection.getUserId() + << ". Only " << adminUser << " can connect to a backup.")); + } + else { + QPID_LOG(debug, "HA: Backup broker accepted connection" + << connection.getMgmtId() << " by user " + << connection.getUserId()); + } + } + + void disconnect(broker::Connection&) {} + + private: + string adminUser; + PrimaryTest isPrimary; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_CONNECTIONEXCLUDER_H*/ diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 9b2598b3bf..21d918bed9 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -19,6 +19,7 @@ * */ #include "Backup.h" +#include "ConnectionExcluder.h" #include "HaBroker.h" #include "Settings.h" #include "ReplicatingSubscription.h" @@ -60,15 +61,20 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) ma->addObject(mgmtObject); } // FIXME aconway 2011-11-22: temporary hack to identify primary. - bool isPrimary = (s.brokerUrl == "primary"); - QPID_LOG(notice, "HA: " << (isPrimary ? "Primary" : "Backup") + bool primary = (s.brokerUrl == "primary"); + QPID_LOG(notice, "HA: " << (primary ? "Primary" : "Backup") << " initialized: client-url=" << clientUrl << " broker-url=" << brokerUrl); - if (!isPrimary) backup.reset(new Backup(broker, s)); + if (!primary) backup.reset(new Backup(broker, s)); // Register a factory for replicating subscriptions. broker.getConsumerFactories().add( boost::shared_ptr( new ReplicatingSubscription::Factory())); + // Register a connection excluder + broker.getConnectionObservers().add( + boost::shared_ptr( + new ConnectionExcluder( + s.adminUser, boost::bind(&HaBroker::isPrimary, this)))); } HaBroker::~HaBroker() {} @@ -87,4 +93,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, return Manageable::STATUS_OK; } +bool HaBroker::isPrimary() const { + return !backup.get(); // TODO aconway 2012-01-18: temporary test. +} + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 0580a06202..18e6156850 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -52,6 +52,7 @@ class HaBroker : public management::Manageable management::Manageable::status_t ManagementMethod ( uint32_t methodId, management::Args& args, std::string& text); + bool isPrimary() const; private: broker::Broker& broker; Url clientUrl, brokerUrl; diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index fc9e48411d..b7ab5ada2d 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -37,6 +37,7 @@ struct Options : public qpid::Options { ("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers") ("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers") ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers") + ("ha-admin-user", optValue(settings.adminUser, "USER"), "User allowed to perform HA administration tasks") ; } }; diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h index a2d2e89d82..9d8821c571 100644 --- a/qpid/cpp/src/qpid/ha/Settings.h +++ b/qpid/cpp/src/qpid/ha/Settings.h @@ -35,11 +35,12 @@ using std::string; class Settings { public: - Settings() : enabled(false) {} + Settings() : enabled(false), adminUser("qpid-ha-admin") {} bool enabled; string clientUrl; string brokerUrl; string username, password, mechanism; + string adminUser; private: }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 38f243a5c1..bf044765b5 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -19,7 +19,7 @@ # import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil -from qpid.messaging import Message, NotFound +from qpid.messaging import Message, NotFound, ConnectionError from brokertest import * from threading import Thread, Lock, Condition from logging import getLogger @@ -54,6 +54,10 @@ class ShortTests(BrokerTest): self.fail("Should not have been replicated: %s"%(address)) except NotFound: pass + def connect_admin(self, backup, **kwargs): + """Connect to a backup broker as the admin user""" + return backup.connect(username="qpid-ha-admin", password="dummy", mechanism="PLAIN", **kwargs) + def test_replication(self): """Test basic replication of wiring and messages before and after backup has connected""" @@ -116,7 +120,7 @@ class ShortTests(BrokerTest): setup(p, "2", primary) # Verify the data on the backup - b = backup.connect().session() + b = self.connect_admin(backup, ).session() verify(b, "1", p) verify(b, "2", p) @@ -162,10 +166,10 @@ class ShortTests(BrokerTest): s.sync() msgs = [str(i) for i in range(30)] - b1 = backup1.connect().session() + b1 = self.connect_admin(backup1).session() self.wait(b1, "q"); self.assert_browse_retry(b1, "q", msgs) - b2 = backup2.connect().session() + b2 = self.connect_admin(backup2).session() self.wait(b2, "q"); self.assert_browse_retry(b2, "q", msgs) @@ -192,14 +196,26 @@ class ShortTests(BrokerTest): self.assertEqual(receiver.wait(), 0) expect = [long(i) for i in range(991, 1001)] sn = lambda m: m.properties["sn"] - self.assert_browse_retry(backup1.connect().session(), "q", expect, transform=sn) - self.assert_browse_retry(backup2.connect().session(), "q", expect, transform=sn) + self.assert_browse_retry(self.connect_admin(backup1).session(), "q", expect, transform=sn) + self.assert_browse_retry(self.connect_admin(backup2).session(), "q", expect, transform=sn) except: print self.browse(primary.connect().session(), "q", transform=sn) - print self.browse(backup1.connect().session(), "q", transform=sn) - print self.browse(backup2.connect().session(), "q", transform=sn) + print self.browse(self.connect_admin(backup1).session(), "q", transform=sn) + print self.browse(self.connect_admin(backup2).session(), "q", transform=sn) raise + def test_exclude(self): + """Verify that backup rejects connections""" + primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary + backup = self.ha_broker(name="backup", broker_url=primary.host_port()) + # Admin is allowed + self.connect_admin(backup) + # Others are not + try: + backup.connect() + self.fail("Expected connection to backup to fail") + except ConnectionError: pass + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) -- cgit v1.2.1