summaryrefslogtreecommitdiff
path: root/python/qpid/messaging.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-11-14 01:12:54 +0000
committerRafael H. Schloming <rhs@apache.org>2009-11-14 01:12:54 +0000
commitcaac82682127f212d2d714154d09fc51244cd4ae (patch)
tree6412fc891a94ce7554c9e8fc836983a1ef39b24f /python/qpid/messaging.py
parent208fb8cefbfdb4a193a319e6d9fc3ba7c81bcd65 (diff)
downloadqpid-python-caac82682127f212d2d714154d09fc51244cd4ae.tar.gz
removed start/stop
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@836085 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r--python/qpid/messaging.py114
1 files changed, 31 insertions, 83 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index ed0bd14f9c..ec1c054e14 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -97,7 +97,7 @@ class Connection:
mechanism="PLAIN", heartbeat=None, **options):
"""
Creates a connection. A newly created connection must be connected
- with the Connection.connect() method before it can be started.
+ with the Connection.connect() method before it can be used.
@type host: str
@param host: the name or ip address of the remote host
@@ -113,7 +113,6 @@ class Connection:
self.mechanism = mechanism
self.heartbeat = heartbeat
- self.started = False
self.id = str(uuid4())
self.session_counter = 0
self.sessions = {}
@@ -166,7 +165,7 @@ class Connection:
if self.sessions.has_key(name):
return self.sessions[name]
else:
- ssn = Session(self, name, self.started, transactional=transactional)
+ ssn = Session(self, name, transactional)
self.sessions[name] = ssn
self._wakeup()
return ssn
@@ -201,24 +200,6 @@ class Connection:
return self._connected
@synchronized
- def start(self):
- """
- Start incoming message delivery for all sessions.
- """
- self.started = True
- for ssn in self.sessions.values():
- ssn.start()
-
- @synchronized
- def stop(self):
- """
- Stop incoming message deliveries for all sessions.
- """
- for ssn in self.sessions.values():
- ssn.stop()
- self.started = False
-
- @synchronized
def close(self):
"""
Close the connection and all sessions.
@@ -269,10 +250,9 @@ class Session:
messages, and manage various Senders and Receivers.
"""
- def __init__(self, connection, name, started, transactional):
+ def __init__(self, connection, name, transactional):
self.connection = connection
self.name = name
- self.started = started
self.transactional = transactional
@@ -346,8 +326,7 @@ class Session:
@rtype: Receiver
@return: a new Receiver for the specified source
"""
- receiver = Receiver(self, len(self.receivers), source, options,
- self.started)
+ receiver = Receiver(self, len(self.receivers), source, options)
self.receivers.append(receiver)
self._wakeup()
return receiver
@@ -453,24 +432,6 @@ class Session:
assert self.aborted
@synchronized
- def start(self):
- """
- Start incoming message delivery for the session.
- """
- self.started = True
- for rcv in self.receivers:
- rcv.start()
-
- @synchronized
- def stop(self):
- """
- Stop incoming message delivery for the session.
- """
- for rcv in self.receivers:
- rcv.stop()
- self.started = False
-
- @synchronized
def close(self):
"""
Close the session.
@@ -611,22 +572,20 @@ class Empty(ReceiveError):
"""
pass
-class Receiver:
+class Receiver(object):
"""
Receives incoming messages from a remote source. Messages may be
fetched with L{fetch}.
"""
- def __init__(self, session, index, source, options, started):
+ def __init__(self, session, index, source, options):
self.session = session
self.index = index
self.destination = str(self.index)
self.source = source
self.options = options
- self.started = started
- self.capacity = options.get("capacity", UNLIMITED)
self.granted = Serial(0)
self.draining = False
self.impending = Serial(0)
@@ -638,6 +597,26 @@ class Receiver:
self.closing = False
self.closed = False
self._lock = self.session._lock
+ self._capacity = 0
+ self._set_capacity(options.get("capacity", 0), False)
+
+ @synchronized
+ def _set_capacity(self, c, wakeup=True):
+ if c is UNLIMITED:
+ self._capacity = c.value
+ else:
+ self._capacity = c
+ self._grant()
+ if wakeup:
+ self._wakeup()
+
+ def _get_capacity(self):
+ if self._capacity == UNLIMITED.value:
+ return UNLIMITED
+ else:
+ return self._capacity
+
+ capacity = property(_get_capacity, _set_capacity)
def _wakeup(self):
self.session._wakeup()
@@ -663,14 +642,6 @@ class Receiver:
"""
return self.received - self.returned
- def _capacity(self):
- if not self.started:
- return 0
- elif self.capacity is UNLIMITED:
- return self.capacity.value
- else:
- return self.capacity
-
def _pred(self, msg):
return msg._receiver == self
@@ -687,7 +658,7 @@ class Receiver:
self._ewait(lambda: self.linked)
- if self._capacity() == 0:
+ if self._capacity == 0:
self.granted = self.returned + 1
self._wakeup()
self._ewait(lambda: self.impending >= self.granted)
@@ -701,39 +672,16 @@ class Receiver:
msg = self.session._get(self._pred, timeout=0)
if msg is None:
raise Empty()
- elif self._capacity() not in (0, UNLIMITED.value):
+ elif self._capacity not in (0, UNLIMITED.value):
self.granted += 1
self._wakeup()
return msg
def _grant(self):
- if self.started:
- if self.capacity is UNLIMITED:
- self.granted = UNLIMITED
- else:
- self.granted = self.received + self._capacity()
+ if self._capacity == UNLIMITED.value:
+ self.granted = UNLIMITED
else:
- self.granted = self.received
-
-
- @synchronized
- def start(self):
- """
- Start incoming message delivery for this receiver.
- """
- self.started = True
- self._grant()
- self._wakeup()
-
- @synchronized
- def stop(self):
- """
- Stop incoming message delivery for this receiver.
- """
- self.started = False
- self._grant()
- self._wakeup()
- self._ewait(lambda: self.impending == self.received)
+ self.granted = self.received + self._capacity
@synchronized
def close(self):