summaryrefslogtreecommitdiff
path: root/python/qpid/framer.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/framer.py')
-rw-r--r--python/qpid/framer.py45
1 files changed, 36 insertions, 9 deletions
diff --git a/python/qpid/framer.py b/python/qpid/framer.py
index 4cd0ae6f26..47f57cf649 100644
--- a/python/qpid/framer.py
+++ b/python/qpid/framer.py
@@ -35,19 +35,29 @@ class Framer(Packer):
def __init__(self, sock):
self.sock = sock
self.sock_lock = RLock()
- self._buf = ""
+ self.tx_buf = ""
+ self.rx_buf = ""
+ self.security_layer_tx = None
+ self.security_layer_rx = None
+ self.maxbufsize = 65535
def aborted(self):
return False
def write(self, buf):
- self._buf += buf
+ self.tx_buf += buf
def flush(self):
self.sock_lock.acquire()
try:
- self._write(self._buf)
- self._buf = ""
+ if self.security_layer_tx:
+ status, cipher_buf = self.security_layer_tx.encode(self.tx_buf)
+ if status == False:
+ raise Closed(self.security_layer_tx.getError())
+ self._write(cipher_buf)
+ else:
+ self._write(self.tx_buf)
+ self.tx_buf = ""
frm.debug("FLUSHED")
finally:
self.sock_lock.release()
@@ -64,25 +74,42 @@ class Framer(Packer):
raw.debug("SENT %r", buf[:n])
buf = buf[n:]
+ ##
+ ## Implementation Note:
+ ##
+ ## This function was modified to use the SASL security layer for content
+ ## decryption. As such, the socket read should read in "self.maxbufsize"
+ ## instead of "n" (the requested number of octets). However, since this
+ ## is one of two places in the code where the socket is read, the read
+ ## size had to be left at "n". This is because this function is
+ ## apparently only used to read the first 8 octets from a TCP socket. If
+ ## we read beyond "n" octets, the remaing octets won't be processed and
+ ## the connection handshake will fail.
+ ##
def read(self, n):
- data = ""
- while len(data) < n:
+ while len(self.rx_buf) < n:
try:
- s = self.sock.recv(n - len(data))
+ s = self.sock.recv(n) # NOTE: instead of "n", arg should be "self.maxbufsize"
+ if self.security_layer_rx:
+ status, s = self.security_layer_rx.decode(s)
+ if status == False:
+ raise Closed(self.security_layer_tx.getError())
except socket.timeout:
if self.aborted():
raise Closed()
else:
continue
except socket.error, e:
- if data != "":
+ if self.rx_buf != "":
raise e
else:
raise Closed()
if len(s) == 0:
raise Closed()
- data += s
+ self.rx_buf += s
raw.debug("RECV %r", s)
+ data = self.rx_buf[0:n]
+ self.rx_buf = self.rx_buf[n:]
return data
def read_header(self):