summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-02-19 12:00:46 +0000
committerTed Ross <tross@apache.org>2009-02-19 12:00:46 +0000
commite61b0224853e3864d2d219f8dd4c55a63a860881 (patch)
treef4119d7112ae45e3ccfaaf4350e9ebf0c692df34 /python
parent9d0a62d9b64388709ffe1593e305dee8f26a8b84 (diff)
downloadqpid-python-e61b0224853e3864d2d219f8dd4c55a63a860881.tar.gz
QPID-1669 - Added client connection management to qpid-cluster.
Also added better session-id discrimination in the qmf.console library. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@745832 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rwxr-xr-xpython/commands/qpid-cluster175
-rw-r--r--python/qmf/console.py4
2 files changed, 160 insertions, 19 deletions
diff --git a/python/commands/qpid-cluster b/python/commands/qpid-cluster
index f2028d944b..2928ba65ea 100755
--- a/python/commands/qpid-cluster
+++ b/python/commands/qpid-cluster
@@ -23,12 +23,17 @@ import os
import getopt
import sys
import locale
+import socket
+import re
from qmf.console import Session
_host = "localhost"
_stopId = None
_stopAll = False
_force = False
+_numeric = False
+_showConn = False
+_delConn = None
def Usage ():
print "Usage: qpid-cluster [OPTIONS] [broker-addr]"
@@ -37,12 +42,40 @@ def Usage ():
print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
print
print "Options:"
- print " -s [--stop] ID Stop one member of the cluster by its ID"
- print " -k [--all-stop] Shut down the whole cluster"
- print " -f [--force] Suppress the 'are-you-sure?' prompt"
+ print " -C [--all-connections] View client connections to all cluster members"
+ print " -c [--connections] ID View client connections to specified member"
+ print " -d [--del-connection] HOST:PORT"
+ print " Disconnect a client connection"
+ print " -s [--stop] ID Stop one member of the cluster by its ID"
+ print " -k [--all-stop] Shut down the whole cluster"
+ print " -f [--force] Suppress the 'are-you-sure?' prompt"
+ print " -n [--numeric] Don't resolve names"
print
sys.exit (1)
+
+class IpAddr:
+ def __init__(self, text):
+ if text.find(":") != -1:
+ tokens = text.split(":")
+ text = tokens[0]
+ self.port = int(tokens[1])
+ else:
+ self.port = 5672
+ self.dottedQuad = socket.gethostbyname(text)
+ nums = self.dottedQuad.split(".")
+ self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
+
+ def bestAddr(self, addrPortList):
+ bestDiff = 0xFFFFFFFF
+ bestAddr = None
+ for addrPort in addrPortList:
+ diff = IpAddr(addrPort[0]).addr ^ self.addr
+ if diff < bestDiff:
+ bestDiff = diff
+ bestAddr = addrPort
+ return bestAddr
+
class BrokerManager:
def __init__(self):
self.brokerName = None
@@ -62,7 +95,7 @@ class BrokerManager:
if self.broker:
self.qmf.delBroker(self.broker)
- def overview(self):
+ def _getClusters(self):
packages = self.qmf.getPackages()
if "org.apache.qpid.cluster" not in packages:
print "Clustering is not installed on the broker."
@@ -73,8 +106,35 @@ class BrokerManager:
print "Clustering is installed but not enabled on the broker."
sys.exit(0)
+ return clusters
+
+ def _getHostList(self, urlList):
+ hosts = []
+ hostAddr = IpAddr(_host)
+ for url in urlList:
+ if url.find("amqp:") != 0:
+ raise Exception("Invalid URL 1")
+ url = url[5:]
+ addrs = str(url).split(",")
+ addrList = []
+ for addr in addrs:
+ tokens = addr.split(":")
+ if len(tokens) != 3:
+ raise Exception("Invalid URL 2")
+ addrList.append((tokens[1], tokens[2]))
+
+ # Find the address in the list that is most likely to be in the same subnet as the address
+ # with which we made the original QMF connection. This increases the probability that we will
+ # be able to reach the cluster member.
+
+ best = hostAddr.bestAddr(addrList)
+ bestUrl = best[0] + ":" + best[1]
+ hosts.append(bestUrl)
+ return hosts
+
+ def overview(self):
+ clusters = self._getClusters()
cluster = clusters[0]
- myUrl = cluster.publishedURL
memberList = cluster.members.split(";")
idList = cluster.memberIDs.split(";")
@@ -86,11 +146,7 @@ class BrokerManager:
print " : ID=%s URL=%s" % (idList[idx], memberList[idx])
def stopMember(self, id):
- clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
- if len(clusters) == 0:
- print "Clustering is installed but not enabled on the broker."
- sys.exit(0)
-
+ clusters = self._getClusters()
cluster = clusters[0]
idList = cluster.memberIDs.split(";")
if id not in idList:
@@ -113,11 +169,7 @@ class BrokerManager:
cluster.stopClusterNode(id)
def stopAll(self):
- clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
- if len(clusters) == 0:
- print "Clustering is installed but not enabled on the broker."
- sys.exit(0)
-
+ clusters = self._getClusters()
if not _force:
prompt = "Warning: This command will shut down the entire cluster."
prompt += " Are you sure? [N]: "
@@ -130,15 +182,72 @@ class BrokerManager:
cluster = clusters[0]
cluster.stopFullCluster()
+ def showConnections(self):
+ clusters = self._getClusters()
+ cluster = clusters[0]
+ memberList = cluster.members.split(";")
+ idList = cluster.memberIDs.split(";")
+ displayList = []
+ hostList = self._getHostList(memberList)
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ self.brokers = []
+ pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
+
+ idx = 0
+ for host in hostList:
+ if _showConn == "all" or _showConn == idList[idx] or _delConn:
+ self.brokers.append(self.qmf.addBroker(host))
+ displayList.append(idList[idx])
+ idx += 1
+
+ idx = 0
+ found = False
+ for broker in self.brokers:
+ if not _delConn:
+ print "Clients on Member: ID=%s:" % displayList[idx]
+ connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker)
+ for conn in connList:
+ if pattern.match(conn.address):
+ if _numeric or _delConn:
+ a = conn.address
+ else:
+ tokens = conn.address.split(":")
+ try:
+ hostList = socket.gethostbyaddr(tokens[0])
+ host = hostList[0]
+ except:
+ host = tokens[0]
+ a = host + ":" + tokens[1]
+ if _delConn:
+ tokens = _delConn.split(":")
+ ip = socket.gethostbyname(tokens[0])
+ toDelete = ip + ":" + tokens[1]
+ if a == toDelete:
+ print "Closing connection from client: %s" % a
+ conn.close()
+ found = True
+ else:
+ print " %s" % a
+ idx += 1
+ if not _delConn:
+ print
+ if _delConn and not found:
+ print "Client connection '%s' not found" % _delConn
+
+ for broker in self.brokers:
+ self.qmf.delBroker(broker)
+
+
##
## Main Program
##
try:
- longOpts = ("stop=", "all-stop", "force")
- (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "s:kf", longOpts)
+ longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=", "numeric")
+ (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "s:kfCc:d:n", longOpts)
except:
- Usage ()
+ Usage()
try:
encoding = locale.getpreferredencoding()
@@ -146,13 +255,41 @@ try:
except:
cargs = encArgs
+count = 0
for opt in optlist:
if opt[0] == "-s" or opt[0] == "--stop":
_stopId = opt[1]
+ if len(_stopId.split(":")) != 2:
+ print "Member ID must be of form: <host or ip>:<number>"
+ sys.exit(1)
+ count += 1
if opt[0] == "-k" or opt[0] == "--all-stop":
_stopAll = True
+ count += 1
if opt[0] == "-f" or opt[0] == "--force":
_force = True
+ if opt[0] == "-n" or opt[0] == "--numeric":
+ _numeric = True
+ if opt[0] == "-C" or opt[0] == "--all-connections":
+ _showConn = "all"
+ count += 1
+ if opt[0] == "-c" or opt[0] == "--connections":
+ _showConn = opt[1]
+ if len(_showConn.split(":")) != 2:
+ print "Member ID must be of form: <host or ip>:<number>"
+ sys.exit(1)
+ count += 1
+ if opt[0] == "-d" or opt[0] == "--del-connection":
+ _delConn = opt[1]
+ if len(_delConn.split(":")) != 2:
+ print "Connection must be of form: <host or ip>:<port>"
+ sys.exit(1)
+ count += 1
+
+if count > 1:
+ print "Only one command option may be supplied"
+ print
+ Usage()
nargs = len(cargs)
bm = BrokerManager()
@@ -166,6 +303,8 @@ try:
bm.stopMember(_stopId)
elif _stopAll:
bm.stopAll()
+ elif _showConn or _delConn:
+ bm.showConnections()
else:
bm.overview()
except KeyboardInterrupt:
diff --git a/python/qmf/console.py b/python/qmf/console.py
index 867d707f31..abc0929955 100644
--- a/python/qmf/console.py
+++ b/python/qmf/console.py
@@ -1230,6 +1230,7 @@ class ManagedConnection(Thread):
class Broker:
""" This object represents a connection (or potential connection) to a QMF broker. """
SYNC_TIME = 60
+ nextSeq = 1
def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False):
self.session = session
@@ -1242,7 +1243,8 @@ class Broker:
self.error = None
self.brokerId = None
self.connected = False
- self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
+ self.amqpSessionId = "%s.%d.%d" % (os.uname()[1], os.getpid(), Broker.nextSeq)
+ Broker.nextSeq += 1
if self.session.manageConnections:
self.thread = ManagedConnection(self)
self.thread.start()