summaryrefslogtreecommitdiff
path: root/python/qpid/messaging/endpoints.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-04-09 10:54:07 +0000
committerRafael H. Schloming <rhs@apache.org>2010-04-09 10:54:07 +0000
commitdf0f59227e5947aa4620e8c319823e96c5796234 (patch)
tree8455f285068e911137d753f7c390781432032373 /python/qpid/messaging/endpoints.py
parent7a0d795940c5c68383b0224740f085e8e6b4e60b (diff)
downloadqpid-python-df0f59227e5947aa4620e8c319823e96c5796234.tar.gz
Changes to connection lifecycle methods and Connection parameters:
- Connection.open -> Connection.establish - Connection.connect() split into Connection.open(), Connection.attach() - Connection.disconnect() -> Connection.detach() - reconnect_hosts -> reconnect_urls - transport now takes tcp, ssl, and tcp+tls git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932352 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging/endpoints.py')
-rw-r--r--python/qpid/messaging/endpoints.py149
1 files changed, 108 insertions, 41 deletions
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index 7ac3881bac..17807f20d2 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -36,7 +36,7 @@ from qpid.messaging.constants import *
from qpid.messaging.exceptions import *
from qpid.messaging.message import *
from qpid.ops import PRIMITIVE
-from qpid.util import default
+from qpid.util import default, URL
from threading import Thread, RLock
log = getLogger("qpid.messaging")
@@ -51,61 +51,110 @@ class Connection:
"""
@static
- def open(host, port=None, username="guest", password="guest", **options):
+ def establish(url=None, **options):
"""
- Creates an AMQP connection and connects it to the given host and port.
-
- @type host: str
- @param host: the name or ip address of the remote host
- @type port: int
- @param port: the port number of the remote host
- @rtype: Connection
- @return: a connected Connection
+ Constructs a L{Connection} with the supplied parameters and opens
+ it.
"""
- conn = Connection(host, port, username, password, **options)
- conn.connect()
+ conn = Connection(url, **options)
+ conn.open()
return conn
- def __init__(self, host, port=None, username="guest", password="guest", **options):
+ def __init__(self, url=None, **options):
"""
Creates a connection. A newly created connection must be connected
with the Connection.connect() method before it can be used.
+ @type url: str
+ @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ]
@type host: str
- @param host: the name or ip address of the remote host
+ @param host: the name or ip address of the remote host (overriden by url)
@type port: int
- @param port: the port number of the remote host
+ @param port: the port number of the remote host (overriden by url)
+ @type transport: str
+ @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls)
+ @type heartbeat: int
+ @param heartbeat: heartbeat interval in seconds
+
+ @type username: str
+ @param username: the username for authentication (overriden by url)
+ @type password: str
+ @param password: the password for authentication (overriden by url)
+
+ @type sasl_mechanisms: str
+ @param sasl_mechanisms: space separated list of permitted sasl mechanisms
+ @type sasl_service: str
+ @param sasl_service: ???
+ @type sasl_min_ssf: ???
+ @param sasl_min_ssf: ???
+ @type sasl_max_ssf: ???
+ @param sasl_max_ssf: ???
+
+ @type reconnect: bool
+ @param reconnect: enable/disable automatic reconnect
+ @type reconnect_timeout: float
+ @param reconnect_timeout: total time to attempt reconnect
+ @type reconnect_internal_min: float
+ @param reconnect_internal_min: minimum interval between reconnect attempts
+ @type reconnect_internal_max: float
+ @param reconnect_internal_max: maximum interval between reconnect attempts
+ @type reconnect_internal: float
+ @param reconnect_interval: set both min and max reconnect intervals
+ @type reconnect_limit: int
+ @param reconnect_limit: limit the total number of reconnect attempts
+ @type reconnect_urls: list[str]
+ @param reconnect_urls: list of backup hosts specified as urls
+
+ @type address_ttl: float
+ @param address_ttl: time until cached address resolution expires
+
@rtype: Connection
@return: a disconnected Connection
"""
- self.host = host
- self.username = username
- self.password = password
- self.mechanisms = options.get("mechanisms")
+ if url is None:
+ url = options.get("host")
+ if isinstance(url, basestring):
+ url = URL(url)
+ self.host = url.host
+ if url.scheme == url.AMQP:
+ self.transport = "tcp"
+ elif url.scheme == url.AMQPS:
+ self.transport = "ssl"
+ else:
+ self.transport = options.get("transport", "tcp")
+ if self.transport in ("ssl", "tcp+tls"):
+ self.port = default(url.port, options.get("port", AMQPS_PORT))
+ else:
+ self.port = default(url.port, options.get("port", AMQP_PORT))
self.heartbeat = options.get("heartbeat")
+ self.username = default(url.user, options.get("username", "guest"))
+ self.password = default(url.password, options.get("password", "guest"))
+
+ self.sasl_mechanisms = options.get("sasl_mechanisms")
+ self.sasl_service = options.get("sasl_service", "qpidd")
+ self.sasl_min_ssf = options.get("sasl_min_ssf")
+ self.sasl_max_ssf = options.get("sasl_max_ssf")
+
self.reconnect = options.get("reconnect", False)
self.reconnect_timeout = options.get("reconnect_timeout")
- if "reconnect_interval_min" in options:
- self.reconnect_interval_min = options["reconnect_interval_min"]
- else:
- self.reconnect_interval_min = options.get("reconnect_interval", 1)
- if "reconnect_interval_max" in options:
- self.reconnect_interval_max = options["reconnect_interval_max"]
- else:
- self.reconnect_interval_max = options.get("reconnect_interval", 2*60)
+ reconnect_interval = options.get("reconnect_interval")
+ self.reconnect_interval_min = options.get("reconnect_interval_min",
+ default(reconnect_interval, 1))
+ self.reconnect_interval_max = options.get("reconnect_interval_max",
+ default(reconnect_interval, 2*60))
self.reconnect_limit = options.get("reconnect_limit")
- self.reconnect_hosts = options.get("reconnect_hosts", [])
- self.transport = options.get("transport", "plain")
+ self.reconnect_urls = options.get("reconnect_urls", [])
+ self.reconnect_log = options.get("reconnect_log", True)
+
+ self.address_ttl = options.get("address_ttl", 60)
+
self.options = options
- if self.transport == "tls":
- self.port = default(port, AMQPS_PORT)
- else:
- self.port = default(port, AMQP_PORT)
self.id = str(uuid4())
self.session_counter = 0
self.sessions = {}
+ self._open = False
self._connected = False
self._transport_connected = False
self._lock = RLock()
@@ -164,9 +213,26 @@ class Connection:
del self.sessions[ssn.name]
@synchronized
- def connect(self):
+ def open(self):
+ """
+ Opens a connection.
+ """
+ if self._open:
+ raise ConnectionError("already open")
+ self._open = True
+ self.attach()
+
+ @synchronized
+ def opened(self):
+ """
+ Return true if the connection is open, false otherwise.
+ """
+ return self._open
+
+ @synchronized
+ def attach(self):
"""
- Connect to the remote endpoint.
+ Attach to the remote endpoint.
"""
self._connected = True
self._driver.start()
@@ -181,9 +247,9 @@ class Connection:
if not (l.linked or l.error or l.closed)]
@synchronized
- def disconnect(self):
+ def detach(self):
"""
- Disconnect from the remote endpoint.
+ Detach from the remote endpoint.
"""
self._connected = False
self._wakeup()
@@ -192,9 +258,9 @@ class Connection:
self._condition.gc()
@synchronized
- def connected(self):
+ def attached(self):
"""
- Return true if the connection is connected, false otherwise.
+ Return true if the connection is attached, false otherwise.
"""
return self._connected
@@ -207,7 +273,8 @@ class Connection:
for ssn in self.sessions.values():
ssn.close()
finally:
- self.disconnect()
+ self.detach()
+ self._open = False
class Session:
@@ -680,7 +747,7 @@ class Sender:
"""
if not self.session.connection._connected or self.session.closing:
- raise Disconnected()
+ raise Detached()
self._ewait(lambda: self.linked)