diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-02-16 03:48:44 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-16 03:48:44 +0000 |
commit | 1f0c6b6661d511d5858e3755718750a5e6fc70f8 (patch) | |
tree | d9d80581712be99836609c04e9e23398d5a6b8cd | |
parent | 039b5540faff26ec317b111a5c322e935ae22a45 (diff) | |
download | qpid-python-1f0c6b6661d511d5858e3755718750a5e6fc70f8.tar.gz |
changed sender/receiver to be synchronous by default when invoked on a connected session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@910388 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | qpid/python/examples/api/drain | 5 | ||||
-rwxr-xr-x | qpid/python/examples/api/server | 38 | ||||
-rwxr-xr-x | qpid/python/examples/api/spout | 12 | ||||
-rw-r--r-- | qpid/python/qpid/driver.py | 4 | ||||
-rw-r--r-- | qpid/python/qpid/messaging.py | 29 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging.py | 61 |
6 files changed, 85 insertions, 64 deletions
diff --git a/qpid/python/examples/api/drain b/qpid/python/examples/api/drain index f2d7a50058..c244cbc09c 100755 --- a/qpid/python/examples/api/drain +++ b/qpid/python/examples/api/drain @@ -93,9 +93,8 @@ try: ssn.acknowledge() except Empty: break - except ReceiveError, e: - print e - break +except ReceiveError, e: + print e except KeyboardInterrupt: pass diff --git a/qpid/python/examples/api/server b/qpid/python/examples/api/server index a9cd8579e3..d7cd53de4b 100755 --- a/qpid/python/examples/api/server +++ b/qpid/python/examples/api/server @@ -51,15 +51,12 @@ else: parser.error("address is required") # XXX: should make URL default the port for us -conn = Connection.open(url.host, url.port or AMQP_PORT, - username=url.user, - password=url.password, - reconnect=opts.reconnect, - reconnect_delay=opts.reconnect_delay, - reconnect_limit=opts.reconnect_limit) -ssn = conn.session() -rcv = ssn.receiver(addr) - +conn = Connection(url.host, url.port or AMQP_PORT, + username=url.user, + password=url.password, + reconnect=opts.reconnect, + reconnect_delay=opts.reconnect_delay, + reconnect_limit=opts.reconnect_limit) def dispatch(msg): msg_type = msg.properties.get("type") if msg_type == "shell": @@ -77,21 +74,26 @@ def dispatch(msg): result = Message("unrecognized message type: %s" % msg_type) return result -while True: - try: +try: + conn.connect() + ssn = conn.session() + rcv = ssn.receiver(addr) + + while True: msg = rcv.fetch() response = dispatch(msg) - snd = ssn.sender(msg.reply_to) + snd = None try: + snd = ssn.sender(msg.reply_to) snd.send(response) except SendError, e: print e - snd.close() + if snd is not None: + snd.close() ssn.acknowledge() - except Empty: - break - except ReceiveError, e: - print e - break +except ReceiveError, e: + print e +except KeyboardInterrupt: + pass conn.close() diff --git a/qpid/python/examples/api/spout b/qpid/python/examples/api/spout index 97cb540c21..5479b66211 100755 --- a/qpid/python/examples/api/spout +++ b/qpid/python/examples/api/spout @@ -113,13 +113,11 @@ try: name, val = nameval(p) msg.properties[name] = val - try: - snd.send(msg) - count += 1 - print msg - except SendError, e: - print e - break + snd.send(msg) + count += 1 + print msg +except SendError, e: + print e except KeyboardInterrupt: pass diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py index 03ba85aa39..2eef9db06c 100644 --- a/qpid/python/qpid/driver.py +++ b/qpid/python/qpid/driver.py @@ -560,6 +560,8 @@ class Driver: self.delete(sst, _snd.name, do_unlink) else: do_unlink() + elif not snd.linked and snd.closing and not snd.closed: + snd.closed = True def link_in(self, rcv): sst = self._attachments.get(rcv.session) @@ -633,6 +635,8 @@ class Driver: else: sst.write_cmd(MessageCancel(_rcv.destination), do_unlink) _rcv.canceled = True + elif not rcv.linked and rcv.closing and not rcv.closed: + rcv.closed = True POLICIES = Values("always", "sender", "receiver", "never") diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py index 9108832eec..6eb8fa0e9d 100644 --- a/qpid/python/qpid/messaging.py +++ b/qpid/python/qpid/messaging.py @@ -181,7 +181,14 @@ class Connection: """ self._connected = True self._wakeup() - self._ewait(lambda: self._driver._connected, exc=ConnectError) + self._ewait(lambda: self._driver._connected and not self._unlinked(), + exc=ConnectError) + + def _unlinked(self): + return [l + for ssn in self.sessions.values() + for l in ssn.senders + ssn.receivers + if not (l.linked or l.error or l.closed)] @synchronized def disconnect(self): @@ -484,11 +491,13 @@ class Session: sender = Sender(self, self.next_sender_id, target, options) self.next_sender_id += 1 self.senders.append(sender) - self._wakeup() - # XXX: because of the lack of waiting here we can end up getting - # into the driver loop with messages sent for senders that haven't - # been linked yet, something similar can probably happen for - # receivers + if not self.closed and self.connection._connected: + self._wakeup() + try: + sender._ewait(lambda: sender.linked) + except SendError, e: + sender.close() + raise e return sender @synchronized @@ -505,7 +514,13 @@ class Session: receiver = Receiver(self, self.next_receiver_id, source, options) self.next_receiver_id += 1 self.receivers.append(receiver) - self._wakeup() + if not self.closed and self.connection._connected: + self._wakeup() + try: + receiver._ewait(lambda: receiver.linked) + except ReceiveError, e: + receiver.close() + raise e return receiver @synchronized diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py index 1fe9ad5f8c..ce1302a4d6 100644 --- a/qpid/python/qpid/tests/messaging.py +++ b/qpid/python/qpid/tests/messaging.py @@ -214,6 +214,15 @@ class SessionTests(Base): self.ssn.acknowledge(msg) snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}') + def testDisconnectedReceiver(self): + self.conn.disconnect() + rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}") + m = self.content("testDisconnectedReceiver") + self.conn.connect() + snd = self.ssn.sender("test-dis-rcv-queue") + snd.send(m) + self.drain(rcv, expected=[m]) + def testNextReceiver(self): ADDR = 'test-next-rcv-queue; {create: always, delete: always}' rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED) @@ -581,16 +590,14 @@ class AddressTests(Base): return self.conn.session() def badOption(self, options, error): - snd = self.ssn.sender("test-bad-options-snd; %s" % options) try: - snd.send("ping") + self.ssn.sender("test-bad-options-snd; %s" % options) assert False except SendError, e: assert "error in options: %s" % error == str(e), e - rcv = self.ssn.receiver("test-bad-options-rcv; %s" % options) try: - rcv.fetch(timeout=0) + self.ssn.receiver("test-bad-options-rcv; %s" % options) assert False except ReceiveError, e: assert "error in options: %s" % error == str(e), e @@ -673,9 +680,8 @@ class AddressTests(Base): snd = self.ssn.sender("test-delete; {delete: always}") snd.send("ping") snd.close() - snd = self.ssn.sender("test-delete") try: - snd.send("ping") + self.ssn.sender("test-delete") except SendError, e: assert "no such queue" in str(e) @@ -689,7 +695,8 @@ class AddressTests(Base): try: self.ssn.receiver("test-delete") - except SendError, e: + assert False + except ReceiveError, e: assert "no such queue" in str(e) def testDeleteSpecial(self): @@ -789,59 +796,55 @@ class AddressErrorTests(Base): def setup_session(self): return self.conn.session() - def sendErrorTest(self, addr, exc, check=lambda e: True): - snd = self.ssn.sender(addr, durable=self.durable()) + def senderErrorTest(self, addr, exc, check=lambda e: True): try: - snd.send("hello") - assert False, "send succeeded" + self.ssn.sender(addr, durable=self.durable()) + assert False, "sender creation succeeded" except exc, e: assert check(e), "unexpected error: %s" % compat.format_exc(e) - snd.close() - def fetchErrorTest(self, addr, exc, check=lambda e: True): - rcv = self.ssn.receiver(addr) + def receiverErrorTest(self, addr, exc, check=lambda e: True): try: - rcv.fetch(timeout=0) - assert False, "fetch succeeded" + self.ssn.receiver(addr) + assert False, "receiver creation succeeded" except exc, e: assert check(e), "unexpected error: %s" % compat.format_exc(e) - rcv.close() def testNoneTarget(self): # XXX: should have specific exception for this - self.sendErrorTest(None, SendError) + self.senderErrorTest(None, SendError) def testNoneSource(self): # XXX: should have specific exception for this - self.fetchErrorTest(None, ReceiveError) + self.receiverErrorTest(None, ReceiveError) def testNoTarget(self): # XXX: should have specific exception for this - self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e)) + self.senderErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e)) def testNoSource(self): # XXX: should have specific exception for this - self.fetchErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e)) + self.receiverErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e)) def testUnparseableTarget(self): # XXX: should have specific exception for this - self.sendErrorTest(UNPARSEABLE_ADDR, SendError, - lambda e: "expecting COLON" in str(e)) + self.senderErrorTest(UNPARSEABLE_ADDR, SendError, + lambda e: "expecting COLON" in str(e)) def testUnparseableSource(self): # XXX: should have specific exception for this - self.fetchErrorTest(UNPARSEABLE_ADDR, ReceiveError, - lambda e: "expecting COLON" in str(e)) + self.receiverErrorTest(UNPARSEABLE_ADDR, ReceiveError, + lambda e: "expecting COLON" in str(e)) def testUnlexableTarget(self): # XXX: should have specific exception for this - self.sendErrorTest(UNLEXABLE_ADDR, SendError, - lambda e: "unrecognized characters" in str(e)) + self.senderErrorTest(UNLEXABLE_ADDR, SendError, + lambda e: "unrecognized characters" in str(e)) def testUnlexableSource(self): # XXX: should have specific exception for this - self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError, - lambda e: "unrecognized characters" in str(e)) + self.receiverErrorTest(UNLEXABLE_ADDR, ReceiveError, + lambda e: "unrecognized characters" in str(e)) SENDER_Q = 'test-sender-q; {create: always, delete: always}' |