summaryrefslogtreecommitdiff
path: root/amqp/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'amqp/connection.py')
-rw-r--r--amqp/connection.py29
1 files changed, 4 insertions, 25 deletions
diff --git a/amqp/connection.py b/amqp/connection.py
index 065d16b..62598ed 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -259,6 +259,7 @@ class Connection(AbstractChannel):
self.read_timeout, self.write_timeout,
socket_settings=self.socket_settings,
)
+ self.transport.connect()
self.on_inbound_frame = self.frame_handler(
self, self.on_inbound_method)
self._frame_writer = self.frame_writer(self, self.transport)
@@ -388,31 +389,9 @@ class Connection(AbstractChannel):
return self.blocking_read(timeout)
def blocking_read(self, timeout=None):
- read_frame = self.transport.read_frame
- if timeout is None:
- return self.on_inbound_frame(read_frame())
-
- # XXX use select
- sock = self.sock
- prev = sock.gettimeout()
- if prev != timeout:
- sock.settimeout(timeout)
- try:
- try:
- frame = read_frame()
- except SSLError as exc:
- # http://bugs.python.org/issue10272
- if 'timed out' in str(exc):
- raise socket.timeout()
- # Non-blocking SSL sockets can throw SSLError
- if 'The operation did not complete' in str(exc):
- raise socket.timeout()
- raise
- else:
- self.on_inbound_frame(frame)
- finally:
- if prev != timeout:
- sock.settimeout(prev)
+ with self.transport.having_timeout(timeout):
+ frame = self.transport.read_frame()
+ return self.on_inbound_frame(frame)
def on_inbound_method(self, channel_id, method_sig, payload, content):
return self.channels[channel_id].dispatch_method(