summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/commands/qpid-route4
-rw-r--r--python/qmf/console.py22
-rw-r--r--python/qpid/connection.py3
-rw-r--r--python/qpid/delegates.py78
-rw-r--r--python/qpid/framer.py45
5 files changed, 126 insertions, 26 deletions
diff --git a/python/commands/qpid-route b/python/commands/qpid-route
index b515b91267..9965047000 100755
--- a/python/commands/qpid-route
+++ b/python/commands/qpid-route
@@ -93,12 +93,12 @@ class RouteManager:
broker = brokers[0]
link = self.getLink()
if link == None:
- if self.remote.authName == "anonymous":
+ if not self.remote.authName or self.remote.authName == "anonymous":
mech = "ANONYMOUS"
else:
mech = "PLAIN"
res = broker.connect(self.remote.host, self.remote.port, _durable,
- mech, self.remote.authName, self.remote.authPass,
+ mech, self.remote.authName or "", self.remote.authPass or "",
_transport)
if _verbose:
print "Connect method returned:", res.status, res.text
diff --git a/python/qmf/console.py b/python/qmf/console.py
index 8674736982..a75c65addc 100644
--- a/python/qmf/console.py
+++ b/python/qmf/console.py
@@ -38,6 +38,9 @@ from threading import Lock, Condition, Thread
from time import time, strftime, gmtime
from cStringIO import StringIO
+#import qpid.log
+#qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG)
+
class Console:
""" To access the asynchronous operations, a class must be derived from
Console with overrides of any combination of the available methods. """
@@ -100,9 +103,12 @@ class BrokerURL(URL):
self.port = 5671
else:
self.port = 5672
- self.authName = str(self.user or "guest")
- self.authPass = str(self.password or "guest")
- self.authMech = "PLAIN"
+ self.authName = None
+ self.authPass = None
+ if self.user:
+ self.authName = str(self.user)
+ if self.password:
+ self.authPass = str(self.password)
def name(self):
return self.host + ":" + str(self.port)
@@ -469,10 +475,10 @@ class Session:
def __repr__(self):
return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
- def addBroker(self, target="localhost", timeout=None):
+ def addBroker(self, target="localhost", timeout=None, mechanisms=None):
""" Connect to a Qpid broker. Returns an object of type Broker. """
url = BrokerURL(target)
- broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass,
+ broker = Broker(self, url.host, url.port, mechanisms, url.authName, url.authPass,
ssl = url.scheme == URL.AMQPS, connTimeout=timeout)
self.brokers.append(broker)
@@ -1557,10 +1563,11 @@ class Broker:
SYNC_TIME = 60
nextSeq = 1
- def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False, connTimeout=None):
+ def __init__(self, session, host, port, authMechs, authUser, authPass, ssl=False, connTimeout=None):
self.session = session
self.host = host
self.port = port
+ self.mechanisms = authMechs
self.ssl = ssl
self.connTimeout = connTimeout
self.authUser = authUser
@@ -1654,7 +1661,8 @@ class Broker:
connSock = ssl(sock)
else:
connSock = sock
- self.conn = Connection(connSock, username=self.authUser, password=self.authPass)
+ self.conn = Connection(connSock, username=self.authUser, password=self.authPass,
+ mechanism = self.mechanisms, host=self.host, service="qpidd")
def aborted():
raise Timeout("Waiting for connection to be established with broker")
oldAborted = self.conn.aborted
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index 680f8f62e3..18eeb99de8 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -65,6 +65,7 @@ class Connection(Framer):
self.thread.setDaemon(True)
self.channel_max = 65535
+ self.user_id = None
self.op_enc = OpEncoder()
self.seg_enc = SegmentEncoder()
@@ -156,6 +157,8 @@ class Connection(Framer):
while not self.closed:
try:
data = self.sock.recv(64*1024)
+ if self.security_layer_rx and data:
+ status, data = self.security_layer_rx.decode(data)
if not data:
self.detach_all()
break
diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py
index 14111a88df..8ebaff4ba3 100644
--- a/python/qpid/delegates.py
+++ b/python/qpid/delegates.py
@@ -20,11 +20,20 @@
import os, connection, session
from util import notify
from datatypes import RangedSet
-from exceptions import VersionError
+from exceptions import VersionError, Closed
from logging import getLogger
from ops import Control
import sys
+_have_sasl = None
+try:
+ import qpidsasl
+ _have_sasl = True
+except:
+ pass
+finally:
+ pass
+
log = getLogger("qpid.io.ctl")
class Delegate:
@@ -152,13 +161,36 @@ class Client(Delegate):
"qpid.client_pid": os.getpid(),
"qpid.client_ppid": ppid}
- def __init__(self, connection, username="guest", password="guest",
- mechanism="PLAIN", heartbeat=None):
+ def __init__(self, connection, username=None, password=None,
+ mechanism=None, heartbeat=None, **kwargs):
Delegate.__init__(self, connection)
- self.username = username
- self.password = password
- self.mechanism = mechanism
+
+ ##
+ ## self.acceptableMechanisms is the list of SASL mechanisms that the client is willing to
+ ## use. If it's None, then any mechanism is acceptable.
+ ##
+ self.acceptableMechanisms = None
+ if mechanism:
+ self.acceptableMechanisms = mechanism.split(" ")
self.heartbeat = heartbeat
+ self.username = username
+ self.password = password
+
+ if _have_sasl:
+ self.sasl = qpidsasl.Client()
+ if username and len(username) > 0:
+ self.sasl.setAttr("username", str(username))
+ if password and len(password) > 0:
+ self.sasl.setAttr("password", str(password))
+ if "service" in kwargs:
+ self.sasl.setAttr("service", str(kwargs["service"]))
+ if "host" in kwargs:
+ self.sasl.setAttr("host", str(kwargs["host"]))
+ if "min_ssf" in kwargs:
+ self.sasl.setAttr("minssf", kwargs["min_ssf"])
+ if "max_ssf" in kwargs:
+ self.sasl.setAttr("maxssf", kwargs["max_ssf"])
+ self.sasl.init()
def start(self):
# XXX
@@ -171,14 +203,44 @@ class Client(Delegate):
(cli_major, cli_minor, major, minor))
def connection_start(self, ch, start):
- r = "\0%s\0%s" % (self.username, self.password)
- ch.connection_start_ok(client_properties=Client.PROPERTIES, mechanism=self.mechanism, response=r)
+ mech_list = ""
+ for mech in start.mechanisms:
+ if (not self.acceptableMechanisms) or mech in self.acceptableMechanisms:
+ mech_list += str(mech) + " "
+ mech = None
+ initial = None
+ if _have_sasl:
+ status, mech, initial = self.sasl.start(mech_list)
+ if status == False:
+ raise Closed("SASL error: %s" % self.sasl.getError())
+ else:
+ if self.username and self.password and ("PLAIN" in mech_list):
+ mech = "PLAIN"
+ initial = "\0%s\0%s" % (self.username, self.password)
+ else:
+ mech = "ANONYMOUS"
+ if not mech in mech_list:
+ raise Closed("No acceptable SASL authentication mechanism available")
+ ch.connection_start_ok(client_properties=Client.PROPERTIES, mechanism=mech, response=initial)
+
+ def connection_secure(self, ch, secure):
+ resp = None
+ if _have_sasl:
+ status, resp = self.sasl.step(secure.challenge)
+ if status == False:
+ raise Closed("SASL error: %s" % self.sasl.getError())
+ ch.connection_secure_ok(response=resp)
def connection_tune(self, ch, tune):
ch.connection_tune_ok(heartbeat=self.heartbeat)
ch.connection_open()
+ if _have_sasl:
+ self.connection.user_id = self.sasl.getUserId()
+ self.connection.security_layer_tx = self.sasl
def connection_open_ok(self, ch, open_ok):
+ if _have_sasl:
+ self.connection.security_layer_rx = self.sasl
self.connection.opened = True
notify(self.connection.condition)
diff --git a/python/qpid/framer.py b/python/qpid/framer.py
index 4cd0ae6f26..47f57cf649 100644
--- a/python/qpid/framer.py
+++ b/python/qpid/framer.py
@@ -35,19 +35,29 @@ class Framer(Packer):
def __init__(self, sock):
self.sock = sock
self.sock_lock = RLock()
- self._buf = ""
+ self.tx_buf = ""
+ self.rx_buf = ""
+ self.security_layer_tx = None
+ self.security_layer_rx = None
+ self.maxbufsize = 65535
def aborted(self):
return False
def write(self, buf):
- self._buf += buf
+ self.tx_buf += buf
def flush(self):
self.sock_lock.acquire()
try:
- self._write(self._buf)
- self._buf = ""
+ if self.security_layer_tx:
+ status, cipher_buf = self.security_layer_tx.encode(self.tx_buf)
+ if status == False:
+ raise Closed(self.security_layer_tx.getError())
+ self._write(cipher_buf)
+ else:
+ self._write(self.tx_buf)
+ self.tx_buf = ""
frm.debug("FLUSHED")
finally:
self.sock_lock.release()
@@ -64,25 +74,42 @@ class Framer(Packer):
raw.debug("SENT %r", buf[:n])
buf = buf[n:]
+ ##
+ ## Implementation Note:
+ ##
+ ## This function was modified to use the SASL security layer for content
+ ## decryption. As such, the socket read should read in "self.maxbufsize"
+ ## instead of "n" (the requested number of octets). However, since this
+ ## is one of two places in the code where the socket is read, the read
+ ## size had to be left at "n". This is because this function is
+ ## apparently only used to read the first 8 octets from a TCP socket. If
+ ## we read beyond "n" octets, the remaing octets won't be processed and
+ ## the connection handshake will fail.
+ ##
def read(self, n):
- data = ""
- while len(data) < n:
+ while len(self.rx_buf) < n:
try:
- s = self.sock.recv(n - len(data))
+ s = self.sock.recv(n) # NOTE: instead of "n", arg should be "self.maxbufsize"
+ if self.security_layer_rx:
+ status, s = self.security_layer_rx.decode(s)
+ if status == False:
+ raise Closed(self.security_layer_tx.getError())
except socket.timeout:
if self.aborted():
raise Closed()
else:
continue
except socket.error, e:
- if data != "":
+ if self.rx_buf != "":
raise e
else:
raise Closed()
if len(s) == 0:
raise Closed()
- data += s
+ self.rx_buf += s
raw.debug("RECV %r", s)
+ data = self.rx_buf[0:n]
+ self.rx_buf = self.rx_buf[n:]
return data
def read_header(self):