diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-04-09 10:54:07 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-04-09 10:54:07 +0000 |
commit | df0f59227e5947aa4620e8c319823e96c5796234 (patch) | |
tree | 8455f285068e911137d753f7c390781432032373 /python/qpid/messaging/endpoints.py | |
parent | 7a0d795940c5c68383b0224740f085e8e6b4e60b (diff) | |
download | qpid-python-df0f59227e5947aa4620e8c319823e96c5796234.tar.gz |
Changes to connection lifecycle methods and Connection parameters:
- Connection.open -> Connection.establish
- Connection.connect() split into Connection.open(), Connection.attach()
- Connection.disconnect() -> Connection.detach()
- reconnect_hosts -> reconnect_urls
- transport now takes tcp, ssl, and tcp+tls
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932352 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging/endpoints.py')
-rw-r--r-- | python/qpid/messaging/endpoints.py | 149 |
1 files changed, 108 insertions, 41 deletions
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index 7ac3881bac..17807f20d2 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -36,7 +36,7 @@ from qpid.messaging.constants import * from qpid.messaging.exceptions import * from qpid.messaging.message import * from qpid.ops import PRIMITIVE -from qpid.util import default +from qpid.util import default, URL from threading import Thread, RLock log = getLogger("qpid.messaging") @@ -51,61 +51,110 @@ class Connection: """ @static - def open(host, port=None, username="guest", password="guest", **options): + def establish(url=None, **options): """ - Creates an AMQP connection and connects it to the given host and port. - - @type host: str - @param host: the name or ip address of the remote host - @type port: int - @param port: the port number of the remote host - @rtype: Connection - @return: a connected Connection + Constructs a L{Connection} with the supplied parameters and opens + it. """ - conn = Connection(host, port, username, password, **options) - conn.connect() + conn = Connection(url, **options) + conn.open() return conn - def __init__(self, host, port=None, username="guest", password="guest", **options): + def __init__(self, url=None, **options): """ Creates a connection. A newly created connection must be connected with the Connection.connect() method before it can be used. + @type url: str + @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ] @type host: str - @param host: the name or ip address of the remote host + @param host: the name or ip address of the remote host (overriden by url) @type port: int - @param port: the port number of the remote host + @param port: the port number of the remote host (overriden by url) + @type transport: str + @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls) + @type heartbeat: int + @param heartbeat: heartbeat interval in seconds + + @type username: str + @param username: the username for authentication (overriden by url) + @type password: str + @param password: the password for authentication (overriden by url) + + @type sasl_mechanisms: str + @param sasl_mechanisms: space separated list of permitted sasl mechanisms + @type sasl_service: str + @param sasl_service: ??? + @type sasl_min_ssf: ??? + @param sasl_min_ssf: ??? + @type sasl_max_ssf: ??? + @param sasl_max_ssf: ??? + + @type reconnect: bool + @param reconnect: enable/disable automatic reconnect + @type reconnect_timeout: float + @param reconnect_timeout: total time to attempt reconnect + @type reconnect_internal_min: float + @param reconnect_internal_min: minimum interval between reconnect attempts + @type reconnect_internal_max: float + @param reconnect_internal_max: maximum interval between reconnect attempts + @type reconnect_internal: float + @param reconnect_interval: set both min and max reconnect intervals + @type reconnect_limit: int + @param reconnect_limit: limit the total number of reconnect attempts + @type reconnect_urls: list[str] + @param reconnect_urls: list of backup hosts specified as urls + + @type address_ttl: float + @param address_ttl: time until cached address resolution expires + @rtype: Connection @return: a disconnected Connection """ - self.host = host - self.username = username - self.password = password - self.mechanisms = options.get("mechanisms") + if url is None: + url = options.get("host") + if isinstance(url, basestring): + url = URL(url) + self.host = url.host + if url.scheme == url.AMQP: + self.transport = "tcp" + elif url.scheme == url.AMQPS: + self.transport = "ssl" + else: + self.transport = options.get("transport", "tcp") + if self.transport in ("ssl", "tcp+tls"): + self.port = default(url.port, options.get("port", AMQPS_PORT)) + else: + self.port = default(url.port, options.get("port", AMQP_PORT)) self.heartbeat = options.get("heartbeat") + self.username = default(url.user, options.get("username", "guest")) + self.password = default(url.password, options.get("password", "guest")) + + self.sasl_mechanisms = options.get("sasl_mechanisms") + self.sasl_service = options.get("sasl_service", "qpidd") + self.sasl_min_ssf = options.get("sasl_min_ssf") + self.sasl_max_ssf = options.get("sasl_max_ssf") + self.reconnect = options.get("reconnect", False) self.reconnect_timeout = options.get("reconnect_timeout") - if "reconnect_interval_min" in options: - self.reconnect_interval_min = options["reconnect_interval_min"] - else: - self.reconnect_interval_min = options.get("reconnect_interval", 1) - if "reconnect_interval_max" in options: - self.reconnect_interval_max = options["reconnect_interval_max"] - else: - self.reconnect_interval_max = options.get("reconnect_interval", 2*60) + reconnect_interval = options.get("reconnect_interval") + self.reconnect_interval_min = options.get("reconnect_interval_min", + default(reconnect_interval, 1)) + self.reconnect_interval_max = options.get("reconnect_interval_max", + default(reconnect_interval, 2*60)) self.reconnect_limit = options.get("reconnect_limit") - self.reconnect_hosts = options.get("reconnect_hosts", []) - self.transport = options.get("transport", "plain") + self.reconnect_urls = options.get("reconnect_urls", []) + self.reconnect_log = options.get("reconnect_log", True) + + self.address_ttl = options.get("address_ttl", 60) + self.options = options - if self.transport == "tls": - self.port = default(port, AMQPS_PORT) - else: - self.port = default(port, AMQP_PORT) self.id = str(uuid4()) self.session_counter = 0 self.sessions = {} + self._open = False self._connected = False self._transport_connected = False self._lock = RLock() @@ -164,9 +213,26 @@ class Connection: del self.sessions[ssn.name] @synchronized - def connect(self): + def open(self): + """ + Opens a connection. + """ + if self._open: + raise ConnectionError("already open") + self._open = True + self.attach() + + @synchronized + def opened(self): + """ + Return true if the connection is open, false otherwise. + """ + return self._open + + @synchronized + def attach(self): """ - Connect to the remote endpoint. + Attach to the remote endpoint. """ self._connected = True self._driver.start() @@ -181,9 +247,9 @@ class Connection: if not (l.linked or l.error or l.closed)] @synchronized - def disconnect(self): + def detach(self): """ - Disconnect from the remote endpoint. + Detach from the remote endpoint. """ self._connected = False self._wakeup() @@ -192,9 +258,9 @@ class Connection: self._condition.gc() @synchronized - def connected(self): + def attached(self): """ - Return true if the connection is connected, false otherwise. + Return true if the connection is attached, false otherwise. """ return self._connected @@ -207,7 +273,8 @@ class Connection: for ssn in self.sessions.values(): ssn.close() finally: - self.disconnect() + self.detach() + self._open = False class Session: @@ -680,7 +747,7 @@ class Sender: """ if not self.session.connection._connected or self.session.closing: - raise Disconnected() + raise Detached() self._ewait(lambda: self.linked) |