diff options
author | Ken Giusti <kgiusti@apache.org> | 2015-02-11 15:12:54 +0000 |
---|---|---|
committer | Ken Giusti <kgiusti@apache.org> | 2015-02-11 15:12:54 +0000 |
commit | 51f24c14d4efffede82db506aff5796e4d048118 (patch) | |
tree | 083e5b5aaae5f46850ac5da6f17588ac008393c3 | |
parent | cba338185d3c3f9bdc2e0b490df20d07ffade454 (diff) | |
download | qpid-python-51f24c14d4efffede82db506aff5796e4d048118.tar.gz |
QPID-5799: provide notification callback for received messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658984 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qpid/messaging/driver.py | 3 | ||||
-rw-r--r-- | qpid/python/qpid/messaging/endpoints.py | 18 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging/endpoints.py | 25 |
3 files changed, 45 insertions, 1 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index e7d564f555..7c30e5d4ba 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -1368,7 +1368,8 @@ class Engine: assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) rcv.received += 1 log.debug("RCVD[%s]: %s", ssn.log_id, msg) - ssn.incoming.append(msg) + ssn.message_received(msg) + def _decode(self, xfr): dp = EMPTY_DP diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py index 7d353e1cb4..6d58b4ac25 100644 --- a/qpid/python/qpid/messaging/endpoints.py +++ b/qpid/python/qpid/messaging/endpoints.py @@ -569,6 +569,7 @@ class Session(Endpoint): self.closed = False self._lock = connection._lock + self._msg_received = None def __repr__(self): return "<Session %s>" % self.name @@ -600,6 +601,11 @@ class Session(Endpoint): if self.closed: raise SessionClosed() + def message_received(self, msg): + self.incoming.append(msg) + if self._msg_received: + self._msg_received() + @synchronized def sender(self, target, **options): """ @@ -685,6 +691,18 @@ class Session(Endpoint): return None @synchronized + def set_message_received_handler(self, handler): + """Register a callback that will be invoked when a message arrives on the + session. Use with caution: since this callback is invoked in the context + of the driver thread, it is not safe to call any of the public messaging + APIs from within this callback. The intent of the handler is to provide + an efficient way to notify the application that a message has arrived. + This can be useful for those applications that need to schedule a task + to poll for received messages without blocking in the messaging API. + """ + self._msg_received = handler + + @synchronized def next_receiver(self, timeout=None): if self._ecwait(lambda: self.incoming, timeout): return self.incoming[0]._receiver diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py index 247d6e9a29..56722374e5 100644 --- a/qpid/python/qpid/tests/messaging/endpoints.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -660,6 +660,31 @@ class SessionTests(Base): except Detached: pass + def testRxCallback(self): + """Verify that the callback is invoked when a message is received. + """ + ADDR = 'test-rx_callback-queue; {create: always, delete: receiver}' + class CallbackHandler: + def __init__(self): + self.handler_called = False + def __call__(self): + self.handler_called = True + cb = CallbackHandler() + self.ssn.set_message_received_handler(cb) + rcv = self.ssn.receiver(ADDR) + rcv.capacity = UNLIMITED + snd = self.ssn.sender(ADDR) + assert not cb.handler_called + snd.send("Ping") + deadline = time.time() + self.timeout() + while time.time() < deadline: + if cb.handler_called: + break; + assert cb.handler_called + snd.close() + rcv.close() + + RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}' class ReceiverTests(Base): |