diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/commands/qpid-config | 22 | ||||
-rwxr-xr-x | python/commands/qpid-queue-stats | 20 | ||||
-rwxr-xr-x | python/commands/qpid-route | 21 | ||||
-rw-r--r-- | python/qpid/connection.py | 11 | ||||
-rw-r--r-- | python/qpid/delegates.py | 12 | ||||
-rw-r--r-- | python/qpid/managementdata.py | 22 | ||||
-rw-r--r-- | python/qpid/testlib.py | 3 |
7 files changed, 48 insertions, 63 deletions
diff --git a/python/commands/qpid-config b/python/commands/qpid-config index e916ee0f6c..8f2fb7ff1b 100755 --- a/python/commands/qpid-config +++ b/python/commands/qpid-config @@ -26,6 +26,7 @@ import socket import qpid from threading import Condition from qpid.management import managementClient +from qpid.managementdata import Broker from qpid.peer import Closed from qpid.connection import Connection from qpid.datatypes import uuid4 @@ -62,8 +63,8 @@ def Usage (): print "Options:" print " -b [ --bindings ] Show bindings in queue or exchange list" print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker" - print " broker-addr is in the form: hostname | ip-address [:<port>]" - print " ex: localhost, 10.1.1.7:10000, broker-host:10000" + print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print " -s [ --spec-file] Path (" + _defspecpath + ")" print " AMQP specification file" print @@ -79,20 +80,6 @@ def Usage (): print sys.exit (1) -class Broker: - def __init__ (self, text): - colon = text.find (":") - if colon == -1: - host = text - self.port = 5672 - else: - host = text[:colon] - self.port = int (text[colon+1:]) - self.host = socket.gethostbyname (host) - - def name (self): - return self.host + ":" + str (self.port) - class BrokerManager: def __init__ (self): self.dest = None @@ -106,7 +93,8 @@ class BrokerManager: try: self.spec = qpid.spec.load (_specpath) self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (self.broker.host, self.broker.port), self.spec) + self.conn = Connection (connect (self.broker.host, self.broker.port), self.spec, + username=self.broker.username, password=self.broker.password) self.conn.start () self.session = self.conn.session(str(uuid4())) self.mclient = managementClient (self.spec) diff --git a/python/commands/qpid-queue-stats b/python/commands/qpid-queue-stats index 813c6e0cd2..8771c7dc5d 100755 --- a/python/commands/qpid-queue-stats +++ b/python/commands/qpid-queue-stats @@ -27,25 +27,12 @@ import socket import qpid from threading import Condition from qpid.management import managementClient +from qpid.managementdata import Broker from qpid.peer import Closed from qpid.connection import Connection from qpid.util import connect from time import sleep -class Broker: - def __init__ (self, text): - colon = text.find (":") - if colon == -1: - host = text - self.port = 5672 - else: - host = text[:colon] - self.port = int (text[colon+1:]) - self.host = socket.gethostbyname (host) - - def name (self): - return self.host + ":" + str (self.port) - class mgmtObject (object): """ Generic object that holds the contents of a management object with its attributes set as object attributes. """ @@ -74,7 +61,8 @@ class BrokerManager: try: self.spec = qpid.spec.load (self.specpath) self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (self.broker.host, self.broker.port), self.spec) + self.conn = Connection (connect (self.broker.host, self.broker.port), self.spec, + username=self.broker.username, password=self.broker.password) self.conn.start () self.mclient = managementClient (self.spec, None, self.configCb, self.instCb) self.mchannel = self.mclient.addChannel (self.conn.session(self.sessionId)) @@ -154,7 +142,7 @@ class BrokerManager: ## def main(): p = optparse.OptionParser() - p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000') + p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: [username/password@] hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost') p.add_option('--amqp-spec-file','-s', default='"/usr/share/amqp/amqp.0-10.xml', help='the path to the amqp spec file') p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show') diff --git a/python/commands/qpid-route b/python/commands/qpid-route index b08293fa00..baa45a320f 100755 --- a/python/commands/qpid-route +++ b/python/commands/qpid-route @@ -25,6 +25,7 @@ import socket import qpid import os from qpid.management import managementClient +from qpid.managementdata import Broker from qpid.peer import Closed from qpid.connection import Connection from qpid.util import connect @@ -41,8 +42,8 @@ def Usage (): print " -v [ --verbose ] Verbose output" print " -q [ --quiet ] Quiet output, don't print duplicate warnings" print - print " dest-broker and src-broker are in the form: hostname | ip-address [:<port>]" - print " ex: localhost, 10.1.1.7:10000, broker-host:10000" + print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:<port>]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print #print " If loading the route configuration from a file, the input file has one line per route" #print " in the form:" @@ -55,20 +56,6 @@ _specpath = "/usr/share/amqp/amqp.0-10.xml" _verbose = False _quiet = False -class Broker: - def __init__ (self, text): - colon = text.find (":") - if colon == -1: - host = text - self.port = 5672 - else: - host = text[:colon] - self.port = int (text[colon+1:]) - self.host = socket.gethostbyname (host) - - def name (self): - return self.host + ":" + str (self.port) - class RouteManager: def __init__ (self, destBroker): self.dest = Broker (destBroker) @@ -81,7 +68,7 @@ class RouteManager: try: self.spec = qpid.spec.load (_specpath) self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (broker.host, broker.port), self.spec) + self.conn = Connection (connect (broker.host, broker.port), self.spec, username=broker.username, password=broker.password) self.conn.start () self.mclient = managementClient (self.spec) self.mch = self.mclient.addChannel (self.conn.session(self.sessionId)) diff --git a/python/qpid/connection.py b/python/qpid/connection.py index 39f882e9c3..4ed430249b 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -37,6 +37,8 @@ class ChannelsBusy(Exception): pass class SessionBusy(Exception): pass +class ConnectionFailed(Exception): pass + def client(*args): return delegates.Client(*args) @@ -45,7 +47,7 @@ def server(*args): class Connection(Assembler): - def __init__(self, sock, spec=None, delegate=client): + def __init__(self, sock, spec=None, delegate=client, **args): Assembler.__init__(self, sock) if spec == None: spec = load(default()) @@ -58,13 +60,14 @@ class Connection(Assembler): self.condition = Condition() self.opened = False + self.failed = False self.thread = Thread(target=self.run) self.thread.setDaemon(True) self.channel_max = 65535 - self.delegate = delegate(self) + self.delegate = delegate(self, args) def attach(self, name, ch, delegate, force=False): self.lock.acquire() @@ -127,8 +130,10 @@ class Connection(Assembler): def start(self, timeout=None): self.delegate.start() self.thread.start() - if not wait(self.condition, lambda: self.opened, timeout): + if not wait(self.condition, lambda: self.opened or self.failed, timeout): raise Timeout() + if (self.failed): + raise ConnectionFailed() def run(self): # XXX: we don't really have a good way to exit this loop without diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py index f31d9a0f09..cdff132219 100644 --- a/python/qpid/delegates.py +++ b/python/qpid/delegates.py @@ -52,6 +52,9 @@ class Delegate: def connection_close(self, ch, close): ch.connection_close_ok() self.connection.sock.close() + if not self.connection.opened: + self.connection.failed = True + notify(self.connection.condition) def connection_close_ok(self, ch, close_ok): self.connection.opened = False @@ -124,12 +127,19 @@ class Client(Delegate): "version": "development", "platform": os.name} + def __init__(self, connection, args={}): + Delegate.__init__(self, connection) + self.username = args.get('username', 'guest') + self.password = args.get('password', 'guest') + self.mechanism = args.get('mechanism', 'PLAIN') + def start(self): self.connection.write_header(self.spec.major, self.spec.minor) self.connection.read_header() def connection_start(self, ch, start): - ch.connection_start_ok(client_properties=Client.PROPERTIES, mechanism="ANONYMOUS") + r = "\0%s\0%s" % (self.username, self.password) + ch.connection_start_ok(client_properties=Client.PROPERTIES, mechanism=self.mechanism, response=r) def connection_tune(self, ch, tune): ch.connection_tune_ok() diff --git a/python/qpid/managementdata.py b/python/qpid/managementdata.py index c908483354..a0197ba7d2 100644 --- a/python/qpid/managementdata.py +++ b/python/qpid/managementdata.py @@ -20,6 +20,7 @@ # import qpid +import re import socket import struct import os @@ -32,14 +33,18 @@ from qpid.util import connect class Broker: def __init__ (self, text): - colon = text.find (":") - if colon == -1: - host = text - self.port = 5672 - else: - host = text[:colon] - self.port = int (text[colon+1:]) + rex = re.compile(r""" + # [ <user> [ / <password> ] @] <host> [ :<port> ] + ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) + match = rex.match(text) + if not match: raise ValueError("'%s' is not a valid broker url" % (text)) + user, password, host, port = match.groups() + self.host = socket.gethostbyname (host) + if port: self.port = int(port) + else: self.port = 5672 + self.username = user or "guest" + self.password = password or "guest" def name (self): return self.host + ":" + str (self.port) @@ -174,7 +179,8 @@ class ManagementData: self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) self.broker = Broker (host) - self.conn = Connection (connect (self.broker.host, self.broker.port), self.spec) + self.conn = Connection (connect (self.broker.host, self.broker.port), self.spec, + username=self.broker.username, password=self.broker.password) self.conn.start () self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler, diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index adda1a650f..b5aa59f586 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -353,7 +353,8 @@ class TestBase010(unittest.TestCase): def setUp(self): spec = testrunner.spec - self.conn = Connection(connect(testrunner.host, testrunner.port), spec) + self.conn = Connection(connect(testrunner.host, testrunner.port), spec, + username=testrunner.user, password=testrunner.password) self.conn.start(timeout=10) self.session = self.conn.session("test-session", timeout=10) |