diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-11-14 01:12:54 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-11-14 01:12:54 +0000 |
commit | caac82682127f212d2d714154d09fc51244cd4ae (patch) | |
tree | 6412fc891a94ce7554c9e8fc836983a1ef39b24f /python/qpid/messaging.py | |
parent | 208fb8cefbfdb4a193a319e6d9fc3ba7c81bcd65 (diff) | |
download | qpid-python-caac82682127f212d2d714154d09fc51244cd4ae.tar.gz |
removed start/stop
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@836085 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r-- | python/qpid/messaging.py | 114 |
1 files changed, 31 insertions, 83 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index ed0bd14f9c..ec1c054e14 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -97,7 +97,7 @@ class Connection: mechanism="PLAIN", heartbeat=None, **options): """ Creates a connection. A newly created connection must be connected - with the Connection.connect() method before it can be started. + with the Connection.connect() method before it can be used. @type host: str @param host: the name or ip address of the remote host @@ -113,7 +113,6 @@ class Connection: self.mechanism = mechanism self.heartbeat = heartbeat - self.started = False self.id = str(uuid4()) self.session_counter = 0 self.sessions = {} @@ -166,7 +165,7 @@ class Connection: if self.sessions.has_key(name): return self.sessions[name] else: - ssn = Session(self, name, self.started, transactional=transactional) + ssn = Session(self, name, transactional) self.sessions[name] = ssn self._wakeup() return ssn @@ -201,24 +200,6 @@ class Connection: return self._connected @synchronized - def start(self): - """ - Start incoming message delivery for all sessions. - """ - self.started = True - for ssn in self.sessions.values(): - ssn.start() - - @synchronized - def stop(self): - """ - Stop incoming message deliveries for all sessions. - """ - for ssn in self.sessions.values(): - ssn.stop() - self.started = False - - @synchronized def close(self): """ Close the connection and all sessions. @@ -269,10 +250,9 @@ class Session: messages, and manage various Senders and Receivers. """ - def __init__(self, connection, name, started, transactional): + def __init__(self, connection, name, transactional): self.connection = connection self.name = name - self.started = started self.transactional = transactional @@ -346,8 +326,7 @@ class Session: @rtype: Receiver @return: a new Receiver for the specified source """ - receiver = Receiver(self, len(self.receivers), source, options, - self.started) + receiver = Receiver(self, len(self.receivers), source, options) self.receivers.append(receiver) self._wakeup() return receiver @@ -453,24 +432,6 @@ class Session: assert self.aborted @synchronized - def start(self): - """ - Start incoming message delivery for the session. - """ - self.started = True - for rcv in self.receivers: - rcv.start() - - @synchronized - def stop(self): - """ - Stop incoming message delivery for the session. - """ - for rcv in self.receivers: - rcv.stop() - self.started = False - - @synchronized def close(self): """ Close the session. @@ -611,22 +572,20 @@ class Empty(ReceiveError): """ pass -class Receiver: +class Receiver(object): """ Receives incoming messages from a remote source. Messages may be fetched with L{fetch}. """ - def __init__(self, session, index, source, options, started): + def __init__(self, session, index, source, options): self.session = session self.index = index self.destination = str(self.index) self.source = source self.options = options - self.started = started - self.capacity = options.get("capacity", UNLIMITED) self.granted = Serial(0) self.draining = False self.impending = Serial(0) @@ -638,6 +597,26 @@ class Receiver: self.closing = False self.closed = False self._lock = self.session._lock + self._capacity = 0 + self._set_capacity(options.get("capacity", 0), False) + + @synchronized + def _set_capacity(self, c, wakeup=True): + if c is UNLIMITED: + self._capacity = c.value + else: + self._capacity = c + self._grant() + if wakeup: + self._wakeup() + + def _get_capacity(self): + if self._capacity == UNLIMITED.value: + return UNLIMITED + else: + return self._capacity + + capacity = property(_get_capacity, _set_capacity) def _wakeup(self): self.session._wakeup() @@ -663,14 +642,6 @@ class Receiver: """ return self.received - self.returned - def _capacity(self): - if not self.started: - return 0 - elif self.capacity is UNLIMITED: - return self.capacity.value - else: - return self.capacity - def _pred(self, msg): return msg._receiver == self @@ -687,7 +658,7 @@ class Receiver: self._ewait(lambda: self.linked) - if self._capacity() == 0: + if self._capacity == 0: self.granted = self.returned + 1 self._wakeup() self._ewait(lambda: self.impending >= self.granted) @@ -701,39 +672,16 @@ class Receiver: msg = self.session._get(self._pred, timeout=0) if msg is None: raise Empty() - elif self._capacity() not in (0, UNLIMITED.value): + elif self._capacity not in (0, UNLIMITED.value): self.granted += 1 self._wakeup() return msg def _grant(self): - if self.started: - if self.capacity is UNLIMITED: - self.granted = UNLIMITED - else: - self.granted = self.received + self._capacity() + if self._capacity == UNLIMITED.value: + self.granted = UNLIMITED else: - self.granted = self.received - - - @synchronized - def start(self): - """ - Start incoming message delivery for this receiver. - """ - self.started = True - self._grant() - self._wakeup() - - @synchronized - def stop(self): - """ - Stop incoming message delivery for this receiver. - """ - self.started = False - self._grant() - self._wakeup() - self._ewait(lambda: self.impending == self.received) + self.granted = self.received + self._capacity @synchronized def close(self): |