summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-25 18:48:09 +0000
committerAlan Conway <aconway@apache.org>2012-01-25 18:48:09 +0000
commit54d9e7a6228bb1bd291a9aa4c156126359a6dbf0 (patch)
treec14df3610c0eef2f6d8b75636a58249d508d5345
parent41be211a5ed3cffedac0a72c6e49172a97999061 (diff)
downloadqpid-python-54d9e7a6228bb1bd291a9aa4c156126359a6dbf0.tar.gz
QPID-3603: HA backup rejects client connections.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1235867 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/ha.mk3
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionObserver.h52
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionObservers.h56
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.h70
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp16
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h1
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h3
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py32
12 files changed, 231 insertions, 14 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 5caf54e2ba..666f593f7d 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -541,6 +541,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Credit.h \
qpid/broker/Credit.cpp \
qpid/broker/ConsumerFactory.h \
+ qpid/broker/ConnectionObserver.h \
+ qpid/broker/ConnectionObservers.h \
qpid/broker/Daemon.cpp \
qpid/broker/Daemon.h \
qpid/broker/Deliverable.h \
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index 272fdbf296..6874449cc5 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -34,7 +34,8 @@ ha_la_SOURCES = \
qpid/ha/ReplicatingSubscription.h \
qpid/ha/ReplicatingSubscription.cpp \
qpid/ha/BrokerReplicator.cpp \
- qpid/ha/BrokerReplicator.h
+ qpid/ha/BrokerReplicator.h \
+ qpid/ha/ConnectionExcluder.h
ha_la_LIBADD = libqpidbroker.la
ha_la_LDFLAGS = $(PLUGINLDFLAGS)
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 11cf81ea9e..5d4cc99e66 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -38,6 +38,7 @@
#include "qpid/broker/System.h"
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/ConsumerFactory.h"
+#include "qpid/broker/ConnectionObservers.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -200,6 +201,7 @@ public:
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConnectionCounter connectionCounter;
ConsumerFactories consumerFactories;
+ ConnectionObservers connectionObservers;
public:
virtual ~Broker();
@@ -360,6 +362,7 @@ public:
const std::string& connectionId);
ConsumerFactories& getConsumerFactories() { return consumerFactories; }
+ ConnectionObservers& getConnectionObservers() { return connectionObservers; }
};
}}
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 73584be8fd..97e8e8ca13 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/Connection.h"
+#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Bridge.h"
@@ -162,8 +163,11 @@ void Connection::received(framing::AMQFrame& frame) {
recordFromServer(frame);
else
recordFromClient(frame);
- if (!wasOpen && isOpen())
+ if (!wasOpen && isOpen()) {
doIoCallbacks(); // Do any callbacks registered before we opened.
+ // FIXME aconway 2012-01-18: generic observer points.
+ broker.getConnectionObservers().connect(*this);
+ }
}
void Connection::sent(const framing::AMQFrame& frame)
diff --git a/qpid/cpp/src/qpid/broker/ConnectionObserver.h b/qpid/cpp/src/qpid/broker/ConnectionObserver.h
new file mode 100644
index 0000000000..e59ec261bc
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ConnectionObserver.h
@@ -0,0 +1,52 @@
+#ifndef QPID_BROKER_CONNECTIONOBSERVER_H
+#define QPID_BROKER_CONNECTIONOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace broker {
+
+class Connection;
+
+/**
+ * Observer that is informed of connection events. For use by
+ * plug-ins that want to be notified of, or influence, connection
+ * events.
+ */
+class ConnectionObserver
+{
+ public:
+ virtual ~ConnectionObserver() {}
+
+ /** Called when a connection is opened and authentication has been
+ * performed.
+ * @exception Throwing an exception will abort the connection.
+ */
+ virtual void connect(Connection& connection) = 0;
+
+ /** Called when a connection is torn down. */
+ virtual void disconnect(Connection& connection) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONNECTIONOBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/broker/ConnectionObservers.h b/qpid/cpp/src/qpid/broker/ConnectionObservers.h
new file mode 100644
index 0000000000..bef40e26b4
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ConnectionObservers.h
@@ -0,0 +1,56 @@
+#ifndef QPID_BROKER_CONNECTIONOBSERVERS_H
+#define QPID_BROKER_CONNECTIONOBSERVERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionObserver.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A collection of connection observers.
+ * Calling a ConnectionObserver function will call that function on each observer.
+ */
+class ConnectionObservers : public ConnectionObserver {
+ public:
+ // functions for managing the collection of observers
+ void add(boost::shared_ptr<ConnectionObserver> observer) {
+ observers.push_back(observer);
+ }
+
+ // implementation of ConnectionObserver interface
+ void connect(Connection& c) {
+ std::for_each(observers.begin(), observers.end(), boost::bind(&ConnectionObserver::connect, _1, boost::ref(c)));
+ }
+ void disconnect(Connection& c) {
+ std::for_each(observers.begin(), observers.end(), boost::bind(&ConnectionObserver::disconnect, _1, boost::ref(c)));
+ }
+
+ private:
+ typedef std::vector<boost::shared_ptr<ConnectionObserver> > Observers;
+ Observers observers;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONNECTIONOBSERVERS_H*/
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
new file mode 100644
index 0000000000..4a878c81c3
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
@@ -0,0 +1,70 @@
+#ifndef QPID_HA_CONNECTIONEXCLUDER_H
+#define QPID_HA_CONNECTIONEXCLUDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/ConnectionObserver.h"
+#include "qpid/broker/Connection.h"
+#include <boost/function.hpp>
+#include <sstream>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Exclude normal connections to a backup broker.
+ * Connections as ha-admin user are allowed.
+ */
+class ConnectionExcluder : public broker::ConnectionObserver
+{
+ public:
+ typedef boost::function<bool()> PrimaryTest;
+
+ ConnectionExcluder(string adminUser_, PrimaryTest isPrimary_)
+ : adminUser(adminUser_), isPrimary(isPrimary_) {}
+
+ void connect(broker::Connection& connection) {
+ if (!isPrimary() && !connection.isLink()
+ && !connection.isAuthenticatedUser(adminUser))
+ {
+ throw Exception(
+ QPID_MSG(
+ "HA: Backup broker rejected connection "
+ << connection.getMgmtId() << " by user " << connection.getUserId()
+ << ". Only " << adminUser << " can connect to a backup."));
+ }
+ else {
+ QPID_LOG(debug, "HA: Backup broker accepted connection"
+ << connection.getMgmtId() << " by user "
+ << connection.getUserId());
+ }
+ }
+
+ void disconnect(broker::Connection&) {}
+
+ private:
+ string adminUser;
+ PrimaryTest isPrimary;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_CONNECTIONEXCLUDER_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 9b2598b3bf..21d918bed9 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -19,6 +19,7 @@
*
*/
#include "Backup.h"
+#include "ConnectionExcluder.h"
#include "HaBroker.h"
#include "Settings.h"
#include "ReplicatingSubscription.h"
@@ -60,15 +61,20 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
ma->addObject(mgmtObject);
}
// FIXME aconway 2011-11-22: temporary hack to identify primary.
- bool isPrimary = (s.brokerUrl == "primary");
- QPID_LOG(notice, "HA: " << (isPrimary ? "Primary" : "Backup")
+ bool primary = (s.brokerUrl == "primary");
+ QPID_LOG(notice, "HA: " << (primary ? "Primary" : "Backup")
<< " initialized: client-url=" << clientUrl
<< " broker-url=" << brokerUrl);
- if (!isPrimary) backup.reset(new Backup(broker, s));
+ if (!primary) backup.reset(new Backup(broker, s));
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
boost::shared_ptr<ReplicatingSubscription::Factory>(
new ReplicatingSubscription::Factory()));
+ // Register a connection excluder
+ broker.getConnectionObservers().add(
+ boost::shared_ptr<broker::ConnectionObserver>(
+ new ConnectionExcluder(
+ s.adminUser, boost::bind(&HaBroker::isPrimary, this))));
}
HaBroker::~HaBroker() {}
@@ -87,4 +93,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
return Manageable::STATUS_OK;
}
+bool HaBroker::isPrimary() const {
+ return !backup.get(); // TODO aconway 2012-01-18: temporary test.
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 0580a06202..18e6156850 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -52,6 +52,7 @@ class HaBroker : public management::Manageable
management::Manageable::status_t ManagementMethod (
uint32_t methodId, management::Args& args, std::string& text);
+ bool isPrimary() const;
private:
broker::Broker& broker;
Url clientUrl, brokerUrl;
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index fc9e48411d..b7ab5ada2d 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -37,6 +37,7 @@ struct Options : public qpid::Options {
("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers")
("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers")
("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers")
+ ("ha-admin-user", optValue(settings.adminUser, "USER"), "User allowed to perform HA administration tasks")
;
}
};
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index a2d2e89d82..9d8821c571 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -35,11 +35,12 @@ using std::string;
class Settings
{
public:
- Settings() : enabled(false) {}
+ Settings() : enabled(false), adminUser("qpid-ha-admin") {}
bool enabled;
string clientUrl;
string brokerUrl;
string username, password, mechanism;
+ string adminUser;
private:
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 38f243a5c1..bf044765b5 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -19,7 +19,7 @@
#
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
-from qpid.messaging import Message, NotFound
+from qpid.messaging import Message, NotFound, ConnectionError
from brokertest import *
from threading import Thread, Lock, Condition
from logging import getLogger
@@ -54,6 +54,10 @@ class ShortTests(BrokerTest):
self.fail("Should not have been replicated: %s"%(address))
except NotFound: pass
+ def connect_admin(self, backup, **kwargs):
+ """Connect to a backup broker as the admin user"""
+ return backup.connect(username="qpid-ha-admin", password="dummy", mechanism="PLAIN", **kwargs)
+
def test_replication(self):
"""Test basic replication of wiring and messages before and
after backup has connected"""
@@ -116,7 +120,7 @@ class ShortTests(BrokerTest):
setup(p, "2", primary)
# Verify the data on the backup
- b = backup.connect().session()
+ b = self.connect_admin(backup, ).session()
verify(b, "1", p)
verify(b, "2", p)
@@ -162,10 +166,10 @@ class ShortTests(BrokerTest):
s.sync()
msgs = [str(i) for i in range(30)]
- b1 = backup1.connect().session()
+ b1 = self.connect_admin(backup1).session()
self.wait(b1, "q");
self.assert_browse_retry(b1, "q", msgs)
- b2 = backup2.connect().session()
+ b2 = self.connect_admin(backup2).session()
self.wait(b2, "q");
self.assert_browse_retry(b2, "q", msgs)
@@ -192,14 +196,26 @@ class ShortTests(BrokerTest):
self.assertEqual(receiver.wait(), 0)
expect = [long(i) for i in range(991, 1001)]
sn = lambda m: m.properties["sn"]
- self.assert_browse_retry(backup1.connect().session(), "q", expect, transform=sn)
- self.assert_browse_retry(backup2.connect().session(), "q", expect, transform=sn)
+ self.assert_browse_retry(self.connect_admin(backup1).session(), "q", expect, transform=sn)
+ self.assert_browse_retry(self.connect_admin(backup2).session(), "q", expect, transform=sn)
except:
print self.browse(primary.connect().session(), "q", transform=sn)
- print self.browse(backup1.connect().session(), "q", transform=sn)
- print self.browse(backup2.connect().session(), "q", transform=sn)
+ print self.browse(self.connect_admin(backup1).session(), "q", transform=sn)
+ print self.browse(self.connect_admin(backup2).session(), "q", transform=sn)
raise
+ def test_exclude(self):
+ """Verify that backup rejects connections"""
+ primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
+ backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+ # Admin is allowed
+ self.connect_admin(backup)
+ # Others are not
+ try:
+ backup.connect()
+ self.fail("Expected connection to backup to fail")
+ except ConnectionError: pass
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])