summaryrefslogtreecommitdiff
path: root/trunk/qpid/python/qpid/framer.py
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/python/qpid/framer.py')
-rw-r--r--trunk/qpid/python/qpid/framer.py124
1 files changed, 0 insertions, 124 deletions
diff --git a/trunk/qpid/python/qpid/framer.py b/trunk/qpid/python/qpid/framer.py
deleted file mode 100644
index 47f57cf649..0000000000
--- a/trunk/qpid/python/qpid/framer.py
+++ /dev/null
@@ -1,124 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import struct, socket
-from exceptions import Closed
-from packer import Packer
-from threading import RLock
-from logging import getLogger
-
-raw = getLogger("qpid.io.raw")
-frm = getLogger("qpid.io.frm")
-
-class FramingError(Exception): pass
-
-class Framer(Packer):
-
- HEADER="!4s4B"
-
- def __init__(self, sock):
- self.sock = sock
- self.sock_lock = RLock()
- self.tx_buf = ""
- self.rx_buf = ""
- self.security_layer_tx = None
- self.security_layer_rx = None
- self.maxbufsize = 65535
-
- def aborted(self):
- return False
-
- def write(self, buf):
- self.tx_buf += buf
-
- def flush(self):
- self.sock_lock.acquire()
- try:
- if self.security_layer_tx:
- status, cipher_buf = self.security_layer_tx.encode(self.tx_buf)
- if status == False:
- raise Closed(self.security_layer_tx.getError())
- self._write(cipher_buf)
- else:
- self._write(self.tx_buf)
- self.tx_buf = ""
- frm.debug("FLUSHED")
- finally:
- self.sock_lock.release()
-
- def _write(self, buf):
- while buf:
- try:
- n = self.sock.send(buf)
- except socket.timeout:
- if self.aborted():
- raise Closed()
- else:
- continue
- raw.debug("SENT %r", buf[:n])
- buf = buf[n:]
-
- ##
- ## Implementation Note:
- ##
- ## This function was modified to use the SASL security layer for content
- ## decryption. As such, the socket read should read in "self.maxbufsize"
- ## instead of "n" (the requested number of octets). However, since this
- ## is one of two places in the code where the socket is read, the read
- ## size had to be left at "n". This is because this function is
- ## apparently only used to read the first 8 octets from a TCP socket. If
- ## we read beyond "n" octets, the remaing octets won't be processed and
- ## the connection handshake will fail.
- ##
- def read(self, n):
- while len(self.rx_buf) < n:
- try:
- s = self.sock.recv(n) # NOTE: instead of "n", arg should be "self.maxbufsize"
- if self.security_layer_rx:
- status, s = self.security_layer_rx.decode(s)
- if status == False:
- raise Closed(self.security_layer_tx.getError())
- except socket.timeout:
- if self.aborted():
- raise Closed()
- else:
- continue
- except socket.error, e:
- if self.rx_buf != "":
- raise e
- else:
- raise Closed()
- if len(s) == 0:
- raise Closed()
- self.rx_buf += s
- raw.debug("RECV %r", s)
- data = self.rx_buf[0:n]
- self.rx_buf = self.rx_buf[n:]
- return data
-
- def read_header(self):
- return self.unpack(Framer.HEADER)
-
- def write_header(self, major, minor):
- self.sock_lock.acquire()
- try:
- self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor)
- self.flush()
- finally:
- self.sock_lock.release()