summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/connection.py3
-rw-r--r--python/qpid/delegates.py78
-rw-r--r--python/qpid/framer.py45
3 files changed, 109 insertions, 17 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index 680f8f62e3..18eeb99de8 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -65,6 +65,7 @@ class Connection(Framer):
self.thread.setDaemon(True)
self.channel_max = 65535
+ self.user_id = None
self.op_enc = OpEncoder()
self.seg_enc = SegmentEncoder()
@@ -156,6 +157,8 @@ class Connection(Framer):
while not self.closed:
try:
data = self.sock.recv(64*1024)
+ if self.security_layer_rx and data:
+ status, data = self.security_layer_rx.decode(data)
if not data:
self.detach_all()
break
diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py
index 14111a88df..8ebaff4ba3 100644
--- a/python/qpid/delegates.py
+++ b/python/qpid/delegates.py
@@ -20,11 +20,20 @@
import os, connection, session
from util import notify
from datatypes import RangedSet
-from exceptions import VersionError
+from exceptions import VersionError, Closed
from logging import getLogger
from ops import Control
import sys
+_have_sasl = None
+try:
+ import qpidsasl
+ _have_sasl = True
+except:
+ pass
+finally:
+ pass
+
log = getLogger("qpid.io.ctl")
class Delegate:
@@ -152,13 +161,36 @@ class Client(Delegate):
"qpid.client_pid": os.getpid(),
"qpid.client_ppid": ppid}
- def __init__(self, connection, username="guest", password="guest",
- mechanism="PLAIN", heartbeat=None):
+ def __init__(self, connection, username=None, password=None,
+ mechanism=None, heartbeat=None, **kwargs):
Delegate.__init__(self, connection)
- self.username = username
- self.password = password
- self.mechanism = mechanism
+
+ ##
+ ## self.acceptableMechanisms is the list of SASL mechanisms that the client is willing to
+ ## use. If it's None, then any mechanism is acceptable.
+ ##
+ self.acceptableMechanisms = None
+ if mechanism:
+ self.acceptableMechanisms = mechanism.split(" ")
self.heartbeat = heartbeat
+ self.username = username
+ self.password = password
+
+ if _have_sasl:
+ self.sasl = qpidsasl.Client()
+ if username and len(username) > 0:
+ self.sasl.setAttr("username", str(username))
+ if password and len(password) > 0:
+ self.sasl.setAttr("password", str(password))
+ if "service" in kwargs:
+ self.sasl.setAttr("service", str(kwargs["service"]))
+ if "host" in kwargs:
+ self.sasl.setAttr("host", str(kwargs["host"]))
+ if "min_ssf" in kwargs:
+ self.sasl.setAttr("minssf", kwargs["min_ssf"])
+ if "max_ssf" in kwargs:
+ self.sasl.setAttr("maxssf", kwargs["max_ssf"])
+ self.sasl.init()
def start(self):
# XXX
@@ -171,14 +203,44 @@ class Client(Delegate):
(cli_major, cli_minor, major, minor))
def connection_start(self, ch, start):
- r = "\0%s\0%s" % (self.username, self.password)
- ch.connection_start_ok(client_properties=Client.PROPERTIES, mechanism=self.mechanism, response=r)
+ mech_list = ""
+ for mech in start.mechanisms:
+ if (not self.acceptableMechanisms) or mech in self.acceptableMechanisms:
+ mech_list += str(mech) + " "
+ mech = None
+ initial = None
+ if _have_sasl:
+ status, mech, initial = self.sasl.start(mech_list)
+ if status == False:
+ raise Closed("SASL error: %s" % self.sasl.getError())
+ else:
+ if self.username and self.password and ("PLAIN" in mech_list):
+ mech = "PLAIN"
+ initial = "\0%s\0%s" % (self.username, self.password)
+ else:
+ mech = "ANONYMOUS"
+ if not mech in mech_list:
+ raise Closed("No acceptable SASL authentication mechanism available")
+ ch.connection_start_ok(client_properties=Client.PROPERTIES, mechanism=mech, response=initial)
+
+ def connection_secure(self, ch, secure):
+ resp = None
+ if _have_sasl:
+ status, resp = self.sasl.step(secure.challenge)
+ if status == False:
+ raise Closed("SASL error: %s" % self.sasl.getError())
+ ch.connection_secure_ok(response=resp)
def connection_tune(self, ch, tune):
ch.connection_tune_ok(heartbeat=self.heartbeat)
ch.connection_open()
+ if _have_sasl:
+ self.connection.user_id = self.sasl.getUserId()
+ self.connection.security_layer_tx = self.sasl
def connection_open_ok(self, ch, open_ok):
+ if _have_sasl:
+ self.connection.security_layer_rx = self.sasl
self.connection.opened = True
notify(self.connection.condition)
diff --git a/python/qpid/framer.py b/python/qpid/framer.py
index 4cd0ae6f26..47f57cf649 100644
--- a/python/qpid/framer.py
+++ b/python/qpid/framer.py
@@ -35,19 +35,29 @@ class Framer(Packer):
def __init__(self, sock):
self.sock = sock
self.sock_lock = RLock()
- self._buf = ""
+ self.tx_buf = ""
+ self.rx_buf = ""
+ self.security_layer_tx = None
+ self.security_layer_rx = None
+ self.maxbufsize = 65535
def aborted(self):
return False
def write(self, buf):
- self._buf += buf
+ self.tx_buf += buf
def flush(self):
self.sock_lock.acquire()
try:
- self._write(self._buf)
- self._buf = ""
+ if self.security_layer_tx:
+ status, cipher_buf = self.security_layer_tx.encode(self.tx_buf)
+ if status == False:
+ raise Closed(self.security_layer_tx.getError())
+ self._write(cipher_buf)
+ else:
+ self._write(self.tx_buf)
+ self.tx_buf = ""
frm.debug("FLUSHED")
finally:
self.sock_lock.release()
@@ -64,25 +74,42 @@ class Framer(Packer):
raw.debug("SENT %r", buf[:n])
buf = buf[n:]
+ ##
+ ## Implementation Note:
+ ##
+ ## This function was modified to use the SASL security layer for content
+ ## decryption. As such, the socket read should read in "self.maxbufsize"
+ ## instead of "n" (the requested number of octets). However, since this
+ ## is one of two places in the code where the socket is read, the read
+ ## size had to be left at "n". This is because this function is
+ ## apparently only used to read the first 8 octets from a TCP socket. If
+ ## we read beyond "n" octets, the remaing octets won't be processed and
+ ## the connection handshake will fail.
+ ##
def read(self, n):
- data = ""
- while len(data) < n:
+ while len(self.rx_buf) < n:
try:
- s = self.sock.recv(n - len(data))
+ s = self.sock.recv(n) # NOTE: instead of "n", arg should be "self.maxbufsize"
+ if self.security_layer_rx:
+ status, s = self.security_layer_rx.decode(s)
+ if status == False:
+ raise Closed(self.security_layer_tx.getError())
except socket.timeout:
if self.aborted():
raise Closed()
else:
continue
except socket.error, e:
- if data != "":
+ if self.rx_buf != "":
raise e
else:
raise Closed()
if len(s) == 0:
raise Closed()
- data += s
+ self.rx_buf += s
raw.debug("RECV %r", s)
+ data = self.rx_buf[0:n]
+ self.rx_buf = self.rx_buf[n:]
return data
def read_header(self):