summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-25 18:48:46 +0000
committerAlan Conway <aconway@apache.org>2012-01-25 18:48:46 +0000
commit1986c2fa3d11de7856458d5eae34fdc3f80a42b0 (patch)
treee025df507357546bf6465e22e9ea287881ff0ee6
parentfe4d3dd164c2685cb7483c65e2e915f93d13e7e6 (diff)
downloadqpid-python-1986c2fa3d11de7856458d5eae34fdc3f80a42b0.tar.gz
QPID-3603: Use client-properties to signal admin connections.
qpid-ha-admin sets a client-property to indicate an admin connection. Added support for setting client-property in python clients. Added support for getting client-property in a broker. ConnectionExcluder checks admin property. Removed old user-based checks. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1235870 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/ha.mk1
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h5
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionObserver.h2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp44
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.h33
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp41
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h3
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h3
-rw-r--r--qpid/cpp/src/qpid/ha/management-schema.xml4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py46
-rw-r--r--qpid/python/qpid/messaging/driver.py3
-rwxr-xr-xqpid/tools/src/py/qpid-ha-status5
15 files changed, 141 insertions, 60 deletions
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index 6874449cc5..8a2cee30c7 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -35,6 +35,7 @@ ha_la_SOURCES = \
qpid/ha/ReplicatingSubscription.cpp \
qpid/ha/BrokerReplicator.cpp \
qpid/ha/BrokerReplicator.h \
+ qpid/ha/ConnectionExcluder.cpp \
qpid/ha/ConnectionExcluder.h
ha_la_LIBADD = libqpidbroker.la
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 6186c06a3c..855172bc43 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -165,6 +165,9 @@ class Connection : public sys::ConnectionInputHandler,
// Used by cluster during catch-up, see cluster::OutputInterceptor
void doIoCallbacks();
+ void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; }
+ const framing::FieldTable& getClientProperties() const { return clientProperties; }
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<boost::shared_ptr<Queue> >::iterator queue_iterator;
@@ -186,6 +189,8 @@ class Connection : public sys::ConnectionInputHandler,
ErrorListener* errorListener;
uint64_t objectId;
bool shadow;
+ framing::FieldTable clientProperties;
+
/**
* Chained ConnectionOutputHandler that allows outgoing frames to be
* tracked (for updating mgmt stats).
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 6048a46f79..f1d43c5cdb 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -158,6 +158,8 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
throw;
}
const framing::FieldTable& clientProperties = body.getClientProperties();
+ connection.setClientProperties(clientProperties);
+
connection.setFederationLink(clientProperties.get(QPID_FED_LINK));
if (clientProperties.isSet(QPID_FED_TAG)) {
connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
diff --git a/qpid/cpp/src/qpid/broker/ConnectionObserver.h b/qpid/cpp/src/qpid/broker/ConnectionObserver.h
index 12aa8549fd..eea2981185 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionObserver.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionObserver.h
@@ -22,6 +22,8 @@
*
*/
+#include <string>
+
namespace qpid {
namespace broker {
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 6b877314e0..bbf3f4ae73 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -118,12 +118,10 @@ const string S_NONE="none";
const string S_WIRING="wiring";
const string S_ALL="all";
-ReplicateLevel replicateLevel(const string& str) {
- string value(str.size(), '\0');
- transform(str.begin(), str.end(), value.begin(), &tolower);
+ReplicateLevel replicateLevel(const string& level) {
ReplicateLevel rl = RL_NONE;
- if (value == S_WIRING) rl = RL_WIRING;
- else if (value == S_ALL) rl = RL_ALL;
+ if (level == S_WIRING) rl = RL_WIRING;
+ else if (level == S_ALL) rl = RL_ALL;
return rl;
}
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
new file mode 100644
index 0000000000..396217e0ff
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionExcluder.h"
+#include "qpid/broker/Connection.h"
+#include <boost/function.hpp>
+#include <sstream>
+
+namespace qpid {
+namespace ha {
+
+ConnectionExcluder::ConnectionExcluder(PrimaryTest isPrimary_) : isPrimary(isPrimary_) {}
+
+void ConnectionExcluder::opened(broker::Connection& connection) {
+ if (!isPrimary() && !connection.isLink()
+ && !connection.getClientProperties().isSet(ADMIN_TAG))
+ throw Exception(
+ QPID_MSG("HA: Backup broker rejected connection " << connection.getMgmtId()));
+ else
+ QPID_LOG(debug, "HA: Backup broker accepted connection" << connection.getMgmtId());
+}
+
+const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin";
+
+}} // namespace qpid::ha
+
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
index 2ea75efffd..e6c299884e 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
@@ -23,46 +23,35 @@
*/
#include "qpid/broker/ConnectionObserver.h"
-#include "qpid/broker/Connection.h"
#include <boost/function.hpp>
-#include <sstream>
namespace qpid {
+
+namespace broker {
+class Connection;
+}
+
namespace ha {
/**
* Exclude normal connections to a backup broker.
- * Connections as ha-admin user are allowed.
+ * Admin connections are identified by a special flag in client-properties
+ * during connection negotiation.
*/
class ConnectionExcluder : public broker::ConnectionObserver
{
public:
typedef boost::function<bool()> PrimaryTest;
- ConnectionExcluder(string adminUser_, PrimaryTest isPrimary_)
- : adminUser(adminUser_), isPrimary(isPrimary_) {}
+ ConnectionExcluder(PrimaryTest isPrimary_);
- void opened(broker::Connection& connection) {
- if (!isPrimary() && !connection.isLink()
- && !connection.isAuthenticatedUser(adminUser))
- {
- throw Exception(
- QPID_MSG(
- "HA: Backup broker rejected connection "
- << connection.getMgmtId() << " by user " << connection.getUserId()
- << ". Only " << adminUser << " can connect to a backup."));
- }
- else {
- QPID_LOG(debug, "HA: Backup broker accepted connection"
- << connection.getMgmtId() << " by user "
- << connection.getUserId());
- }
- }
+ void opened(broker::Connection& connection);
private:
- string adminUser;
+ static const std::string ADMIN_TAG;
PrimaryTest isPrimary;
};
+
}} // namespace qpid::ha
#endif /*!QPID_HA_CONNECTIONEXCLUDER_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 21d918bed9..a1350e492a 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -44,24 +44,21 @@ Url url(const std::string& s, const std::string& id) {
throw Exception(Msg() << "Invalid URL for " << id << ": '" << s << "'");
}
}
+
+const std::string PRIMARY="primary";
+const std::string BACKUP="backup";
+
} // namespace
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: broker(b),
+ settings(s),
clientUrl(url(s.clientUrl, "ha-client-url")),
brokerUrl(url(s.brokerUrl, "ha-broker-url")),
mgmtObject(0)
{
- ManagementAgent* ma = broker.getManagementAgent();
- if (ma) {
- _qmf::Package packageInit(ma);
- mgmtObject = new _qmf::HaBroker(ma, this);
- // FIXME aconway 2011-11-11: Placeholder - initialize cluster role.
- mgmtObject->set_status("solo");
- ma->addObject(mgmtObject);
- }
// FIXME aconway 2011-11-22: temporary hack to identify primary.
- bool primary = (s.brokerUrl == "primary");
+ bool primary = (settings.brokerUrl == PRIMARY);
QPID_LOG(notice, "HA: " << (primary ? "Primary" : "Backup")
<< " initialized: client-url=" << clientUrl
<< " broker-url=" << brokerUrl);
@@ -73,8 +70,16 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
// Register a connection excluder
broker.getConnectionObservers().add(
boost::shared_ptr<broker::ConnectionObserver>(
- new ConnectionExcluder(
- s.adminUser, boost::bind(&HaBroker::isPrimary, this))));
+ new ConnectionExcluder(boost::bind(&HaBroker::isPrimary, this))));
+
+ ManagementAgent* ma = broker.getManagementAgent();
+ if (ma) {
+ _qmf::Package packageInit(ma);
+ mgmtObject = new _qmf::HaBroker(ma, this);
+ // FIXME aconway 2011-11-11: Placeholder - initialize cluster role.
+ mgmtObject->set_status(isPrimary() ? PRIMARY : BACKUP);
+ ma->addObject(mgmtObject);
+ }
}
HaBroker::~HaBroker() {}
@@ -83,7 +88,19 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
switch (methodId) {
case _qmf::HaBroker::METHOD_SETSTATUS: {
std::string status = dynamic_cast<_qmf::ArgsHaBrokerSetStatus&>(args).i_status;
- // FIXME aconway 2011-11-11: placeholder, validate & execute status change.
+ if (status == PRIMARY) {
+ if (!isPrimary()) {
+ backup.reset();
+ QPID_LOG(notice, "HA Primary: promoted from backup");
+ }
+ } else if (status == BACKUP) {
+ if (isPrimary()) {
+ backup.reset(new Backup(broker, settings));
+ QPID_LOG(notice, "HA Backup: demoted from primary.");
+ }
+ } else {
+ QPID_LOG(error, "Attempt to set invalid HA status: " << status);
+ }
mgmtObject->set_status(status);
break;
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 18e6156850..aff518aa8d 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -22,6 +22,7 @@
*
*/
+#include "Settings.h"
#include "qpid/Url.h"
#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h"
@@ -33,7 +34,6 @@ namespace broker {
class Broker;
}
namespace ha {
-class Settings;
class Backup;
/**
@@ -55,6 +55,7 @@ class HaBroker : public management::Manageable
bool isPrimary() const;
private:
broker::Broker& broker;
+ Settings settings;
Url clientUrl, brokerUrl;
std::auto_ptr<Backup> backup;
qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index b7ab5ada2d..fc9e48411d 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -37,7 +37,6 @@ struct Options : public qpid::Options {
("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers")
("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers")
("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers")
- ("ha-admin-user", optValue(settings.adminUser, "USER"), "User allowed to perform HA administration tasks")
;
}
};
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index 9d8821c571..a2d2e89d82 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -35,12 +35,11 @@ using std::string;
class Settings
{
public:
- Settings() : enabled(false), adminUser("qpid-ha-admin") {}
+ Settings() : enabled(false) {}
bool enabled;
string clientUrl;
string brokerUrl;
string username, password, mechanism;
- string adminUser;
private:
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
index bb06e77a69..0fd18a10d4 100644
--- a/qpid/cpp/src/qpid/ha/management-schema.xml
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -21,9 +21,9 @@
<!-- Monitor and control HA status of a broker. -->
<class name="HaBroker">
- <property name="status" type="sstr" desc="HA status: PRIMARY, BACKUP, SOLO"/>
+ <property name="status" type="sstr" desc="HA status: primary, backup"/>
- <method name="setStatus" desc="Set HA status: PRIMARY, BACKUP, SOLO">
+ <method name="setStatus" desc="Set HA status: primary, backup">
<arg name="status" type="sstr" dir="I"/>
</method>
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index bf044765b5..f3c45ba7a3 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -19,7 +19,7 @@
#
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
-from qpid.messaging import Message, NotFound, ConnectionError
+from qpid.messaging import Message, NotFound, ConnectionError, Connection
from brokertest import *
from threading import Thread, Lock, Condition
from logging import getLogger
@@ -48,15 +48,18 @@ class ShortTests(BrokerTest):
except NotFound: return False
assert retry(check), "Timed out waiting for %s"%(address)
- def assert_missing(self,session, address):
+ def set_ha_status(self, address, status):
+ os.system("qpid-ha-status %s %s"%(address, status))
+
+ def assert_missing(self, session, address):
try:
session.receiver(address)
self.fail("Should not have been replicated: %s"%(address))
except NotFound: pass
def connect_admin(self, backup, **kwargs):
- """Connect to a backup broker as the admin user"""
- return backup.connect(username="qpid-ha-admin", password="dummy", mechanism="PLAIN", **kwargs)
+ """Connect to a backup broker as an admin connection"""
+ return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
def test_replication(self):
"""Test basic replication of wiring and messages before and
@@ -113,6 +116,7 @@ class ShortTests(BrokerTest):
primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
p = primary.connect().session()
+
# Create config, send messages before starting the backup, to test catch-up replication.
setup(p, "1", primary)
backup = self.ha_broker(name="backup", broker_url=primary.host_port())
@@ -120,10 +124,9 @@ class ShortTests(BrokerTest):
setup(p, "2", primary)
# Verify the data on the backup
- b = self.connect_admin(backup, ).session()
+ b = self.connect_admin(backup).session()
verify(b, "1", p)
verify(b, "2", p)
-
# Test a series of messages, enqueue all then dequeue all.
s = p.sender(queue("foo","all"))
self.wait(b, "foo")
@@ -174,6 +177,7 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(b2, "q", msgs)
def test_send_receive(self):
+ """Verify sequence numbers of messages sent by qpid-send"""
primary = self.ha_broker(name="primary", broker_url="primary")
backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port())
backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port())
@@ -204,17 +208,35 @@ class ShortTests(BrokerTest):
print self.browse(self.connect_admin(backup2).session(), "q", transform=sn)
raise
- def test_exclude(self):
- """Verify that backup rejects connections"""
- primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
+ def test_failover(self):
+ """Verify that backups rejects connections and that fail-over works"""
+ primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary
backup = self.ha_broker(name="backup", broker_url=primary.host_port())
- # Admin is allowed
- self.connect_admin(backup)
- # Others are not
+ # Check that backup rejects normal connections
try:
backup.connect()
self.fail("Expected connection to backup to fail")
except ConnectionError: pass
+ # Check that admin connections are allowed to backup.
+ self.connect_admin(backup).close()
+
+ # Test discovery: should connect to primary after reject by backup
+ c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
+ s = c.session()
+ s.sender("q;{create:always,%s}"%(self.qpid_replicate())).send("foo", sync=True)
+ # FIXME aconway 2012-01-23: we shouldn't need the wait and retry here,
+ # send(sync=True) shouldn't return till the backup acknowledges.
+ bs = self.connect_admin(backup).session()
+ self.wait(bs, "q")
+ self.assert_browse_retry(bs, "q", ["foo"])
+ bs.connection.close()
+
+ primary.kill()
+ # Promote the backup
+ self.set_ha_status(backup.host_port(), "primary")
+ # FIXME aconway 2012-01-23: should re-use session s below
+ self.assert_browse_retry(c.session(), "q", ["foo"])
+ c.close()
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index 608791927f..ff657653e0 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -533,7 +533,8 @@ class Driver:
log.warn("reconnect succeeded: %s:%s", host, port)
self._next_retry = None
self._attempts = 0
- self._host = 0
+ # FIXME aconway 2012-01-23: is this correct
+# self._host = 0
self._delay = self.connection.reconnect_interval_min
self._retrying = False
self.schedule()
diff --git a/qpid/tools/src/py/qpid-ha-status b/qpid/tools/src/py/qpid-ha-status
index c70e4c9af3..b4234cc051 100755
--- a/qpid/tools/src/py/qpid-ha-status
+++ b/qpid/tools/src/py/qpid-ha-status
@@ -27,7 +27,7 @@ Usage: qpid-ha-status [broker-address] [status]
If status is specified, sets the HA status of the broker. Otherwise prints the current HA status. Status must be one of: primary, backup, solo.
"""
-STATUS_VALUES=["primary", "backup", "solo"]
+STATUS_VALUES=["primary", "backup"]
def is_valid_status(value): return value in STATUS_VALUES
@@ -39,7 +39,8 @@ class HaBroker:
def __init__(self, broker, session):
self.session = session
try:
- self.qmf_broker = self.session.addBroker(broker)
+ self.qmf_broker = self.session.addBroker(
+ broker, client_properties={"qpid.ha-admin":1})
except Exception, e:
raise Exception("Can't connect to %s: %s"%(broker,e))
ha_brokers=self.session.getObjects(_class="habroker", _package="org.apache.qpid.ha")