diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/commands/qpid-route | 4 | ||||
-rw-r--r-- | python/qmf/console.py | 22 | ||||
-rw-r--r-- | python/qpid/connection.py | 3 | ||||
-rw-r--r-- | python/qpid/delegates.py | 78 | ||||
-rw-r--r-- | python/qpid/framer.py | 45 |
5 files changed, 126 insertions, 26 deletions
diff --git a/python/commands/qpid-route b/python/commands/qpid-route index b515b91267..9965047000 100755 --- a/python/commands/qpid-route +++ b/python/commands/qpid-route @@ -93,12 +93,12 @@ class RouteManager: broker = brokers[0] link = self.getLink() if link == None: - if self.remote.authName == "anonymous": + if not self.remote.authName or self.remote.authName == "anonymous": mech = "ANONYMOUS" else: mech = "PLAIN" res = broker.connect(self.remote.host, self.remote.port, _durable, - mech, self.remote.authName, self.remote.authPass, + mech, self.remote.authName or "", self.remote.authPass or "", _transport) if _verbose: print "Connect method returned:", res.status, res.text diff --git a/python/qmf/console.py b/python/qmf/console.py index 8674736982..a75c65addc 100644 --- a/python/qmf/console.py +++ b/python/qmf/console.py @@ -38,6 +38,9 @@ from threading import Lock, Condition, Thread from time import time, strftime, gmtime from cStringIO import StringIO +#import qpid.log +#qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG) + class Console: """ To access the asynchronous operations, a class must be derived from Console with overrides of any combination of the available methods. """ @@ -100,9 +103,12 @@ class BrokerURL(URL): self.port = 5671 else: self.port = 5672 - self.authName = str(self.user or "guest") - self.authPass = str(self.password or "guest") - self.authMech = "PLAIN" + self.authName = None + self.authPass = None + if self.user: + self.authName = str(self.user) + if self.password: + self.authPass = str(self.password) def name(self): return self.host + ":" + str(self.port) @@ -469,10 +475,10 @@ class Session: def __repr__(self): return "QMF Console Session Manager (brokers: %d)" % len(self.brokers) - def addBroker(self, target="localhost", timeout=None): + def addBroker(self, target="localhost", timeout=None, mechanisms=None): """ Connect to a Qpid broker. Returns an object of type Broker. """ url = BrokerURL(target) - broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass, + broker = Broker(self, url.host, url.port, mechanisms, url.authName, url.authPass, ssl = url.scheme == URL.AMQPS, connTimeout=timeout) self.brokers.append(broker) @@ -1557,10 +1563,11 @@ class Broker: SYNC_TIME = 60 nextSeq = 1 - def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False, connTimeout=None): + def __init__(self, session, host, port, authMechs, authUser, authPass, ssl=False, connTimeout=None): self.session = session self.host = host self.port = port + self.mechanisms = authMechs self.ssl = ssl self.connTimeout = connTimeout self.authUser = authUser @@ -1654,7 +1661,8 @@ class Broker: connSock = ssl(sock) else: connSock = sock - self.conn = Connection(connSock, username=self.authUser, password=self.authPass) + self.conn = Connection(connSock, username=self.authUser, password=self.authPass, + mechanism = self.mechanisms, host=self.host, service="qpidd") def aborted(): raise Timeout("Waiting for connection to be established with broker") oldAborted = self.conn.aborted diff --git a/python/qpid/connection.py b/python/qpid/connection.py index 680f8f62e3..18eeb99de8 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -65,6 +65,7 @@ class Connection(Framer): self.thread.setDaemon(True) self.channel_max = 65535 + self.user_id = None self.op_enc = OpEncoder() self.seg_enc = SegmentEncoder() @@ -156,6 +157,8 @@ class Connection(Framer): while not self.closed: try: data = self.sock.recv(64*1024) + if self.security_layer_rx and data: + status, data = self.security_layer_rx.decode(data) if not data: self.detach_all() break diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py index 14111a88df..8ebaff4ba3 100644 --- a/python/qpid/delegates.py +++ b/python/qpid/delegates.py @@ -20,11 +20,20 @@ import os, connection, session from util import notify from datatypes import RangedSet -from exceptions import VersionError +from exceptions import VersionError, Closed from logging import getLogger from ops import Control import sys +_have_sasl = None +try: + import qpidsasl + _have_sasl = True +except: + pass +finally: + pass + log = getLogger("qpid.io.ctl") class Delegate: @@ -152,13 +161,36 @@ class Client(Delegate): "qpid.client_pid": os.getpid(), "qpid.client_ppid": ppid} - def __init__(self, connection, username="guest", password="guest", - mechanism="PLAIN", heartbeat=None): + def __init__(self, connection, username=None, password=None, + mechanism=None, heartbeat=None, **kwargs): Delegate.__init__(self, connection) - self.username = username - self.password = password - self.mechanism = mechanism + + ## + ## self.acceptableMechanisms is the list of SASL mechanisms that the client is willing to + ## use. If it's None, then any mechanism is acceptable. + ## + self.acceptableMechanisms = None + if mechanism: + self.acceptableMechanisms = mechanism.split(" ") self.heartbeat = heartbeat + self.username = username + self.password = password + + if _have_sasl: + self.sasl = qpidsasl.Client() + if username and len(username) > 0: + self.sasl.setAttr("username", str(username)) + if password and len(password) > 0: + self.sasl.setAttr("password", str(password)) + if "service" in kwargs: + self.sasl.setAttr("service", str(kwargs["service"])) + if "host" in kwargs: + self.sasl.setAttr("host", str(kwargs["host"])) + if "min_ssf" in kwargs: + self.sasl.setAttr("minssf", kwargs["min_ssf"]) + if "max_ssf" in kwargs: + self.sasl.setAttr("maxssf", kwargs["max_ssf"]) + self.sasl.init() def start(self): # XXX @@ -171,14 +203,44 @@ class Client(Delegate): (cli_major, cli_minor, major, minor)) def connection_start(self, ch, start): - r = "\0%s\0%s" % (self.username, self.password) - ch.connection_start_ok(client_properties=Client.PROPERTIES, mechanism=self.mechanism, response=r) + mech_list = "" + for mech in start.mechanisms: + if (not self.acceptableMechanisms) or mech in self.acceptableMechanisms: + mech_list += str(mech) + " " + mech = None + initial = None + if _have_sasl: + status, mech, initial = self.sasl.start(mech_list) + if status == False: + raise Closed("SASL error: %s" % self.sasl.getError()) + else: + if self.username and self.password and ("PLAIN" in mech_list): + mech = "PLAIN" + initial = "\0%s\0%s" % (self.username, self.password) + else: + mech = "ANONYMOUS" + if not mech in mech_list: + raise Closed("No acceptable SASL authentication mechanism available") + ch.connection_start_ok(client_properties=Client.PROPERTIES, mechanism=mech, response=initial) + + def connection_secure(self, ch, secure): + resp = None + if _have_sasl: + status, resp = self.sasl.step(secure.challenge) + if status == False: + raise Closed("SASL error: %s" % self.sasl.getError()) + ch.connection_secure_ok(response=resp) def connection_tune(self, ch, tune): ch.connection_tune_ok(heartbeat=self.heartbeat) ch.connection_open() + if _have_sasl: + self.connection.user_id = self.sasl.getUserId() + self.connection.security_layer_tx = self.sasl def connection_open_ok(self, ch, open_ok): + if _have_sasl: + self.connection.security_layer_rx = self.sasl self.connection.opened = True notify(self.connection.condition) diff --git a/python/qpid/framer.py b/python/qpid/framer.py index 4cd0ae6f26..47f57cf649 100644 --- a/python/qpid/framer.py +++ b/python/qpid/framer.py @@ -35,19 +35,29 @@ class Framer(Packer): def __init__(self, sock): self.sock = sock self.sock_lock = RLock() - self._buf = "" + self.tx_buf = "" + self.rx_buf = "" + self.security_layer_tx = None + self.security_layer_rx = None + self.maxbufsize = 65535 def aborted(self): return False def write(self, buf): - self._buf += buf + self.tx_buf += buf def flush(self): self.sock_lock.acquire() try: - self._write(self._buf) - self._buf = "" + if self.security_layer_tx: + status, cipher_buf = self.security_layer_tx.encode(self.tx_buf) + if status == False: + raise Closed(self.security_layer_tx.getError()) + self._write(cipher_buf) + else: + self._write(self.tx_buf) + self.tx_buf = "" frm.debug("FLUSHED") finally: self.sock_lock.release() @@ -64,25 +74,42 @@ class Framer(Packer): raw.debug("SENT %r", buf[:n]) buf = buf[n:] + ## + ## Implementation Note: + ## + ## This function was modified to use the SASL security layer for content + ## decryption. As such, the socket read should read in "self.maxbufsize" + ## instead of "n" (the requested number of octets). However, since this + ## is one of two places in the code where the socket is read, the read + ## size had to be left at "n". This is because this function is + ## apparently only used to read the first 8 octets from a TCP socket. If + ## we read beyond "n" octets, the remaing octets won't be processed and + ## the connection handshake will fail. + ## def read(self, n): - data = "" - while len(data) < n: + while len(self.rx_buf) < n: try: - s = self.sock.recv(n - len(data)) + s = self.sock.recv(n) # NOTE: instead of "n", arg should be "self.maxbufsize" + if self.security_layer_rx: + status, s = self.security_layer_rx.decode(s) + if status == False: + raise Closed(self.security_layer_tx.getError()) except socket.timeout: if self.aborted(): raise Closed() else: continue except socket.error, e: - if data != "": + if self.rx_buf != "": raise e else: raise Closed() if len(s) == 0: raise Closed() - data += s + self.rx_buf += s raw.debug("RECV %r", s) + data = self.rx_buf[0:n] + self.rx_buf = self.rx_buf[n:] return data def read_header(self): |