summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKen Giusti <kgiusti@apache.org>2015-02-11 15:12:54 +0000
committerKen Giusti <kgiusti@apache.org>2015-02-11 15:12:54 +0000
commit51f24c14d4efffede82db506aff5796e4d048118 (patch)
tree083e5b5aaae5f46850ac5da6f17588ac008393c3
parentcba338185d3c3f9bdc2e0b490df20d07ffade454 (diff)
downloadqpid-python-51f24c14d4efffede82db506aff5796e4d048118.tar.gz
QPID-5799: provide notification callback for received messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658984 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/messaging/driver.py3
-rw-r--r--qpid/python/qpid/messaging/endpoints.py18
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py25
3 files changed, 45 insertions, 1 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index e7d564f555..7c30e5d4ba 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -1368,7 +1368,8 @@ class Engine:
assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
rcv.received += 1
log.debug("RCVD[%s]: %s", ssn.log_id, msg)
- ssn.incoming.append(msg)
+ ssn.message_received(msg)
+
def _decode(self, xfr):
dp = EMPTY_DP
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py
index 7d353e1cb4..6d58b4ac25 100644
--- a/qpid/python/qpid/messaging/endpoints.py
+++ b/qpid/python/qpid/messaging/endpoints.py
@@ -569,6 +569,7 @@ class Session(Endpoint):
self.closed = False
self._lock = connection._lock
+ self._msg_received = None
def __repr__(self):
return "<Session %s>" % self.name
@@ -600,6 +601,11 @@ class Session(Endpoint):
if self.closed:
raise SessionClosed()
+ def message_received(self, msg):
+ self.incoming.append(msg)
+ if self._msg_received:
+ self._msg_received()
+
@synchronized
def sender(self, target, **options):
"""
@@ -685,6 +691,18 @@ class Session(Endpoint):
return None
@synchronized
+ def set_message_received_handler(self, handler):
+ """Register a callback that will be invoked when a message arrives on the
+ session. Use with caution: since this callback is invoked in the context
+ of the driver thread, it is not safe to call any of the public messaging
+ APIs from within this callback. The intent of the handler is to provide
+ an efficient way to notify the application that a message has arrived.
+ This can be useful for those applications that need to schedule a task
+ to poll for received messages without blocking in the messaging API.
+ """
+ self._msg_received = handler
+
+ @synchronized
def next_receiver(self, timeout=None):
if self._ecwait(lambda: self.incoming, timeout):
return self.incoming[0]._receiver
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py
index 247d6e9a29..56722374e5 100644
--- a/qpid/python/qpid/tests/messaging/endpoints.py
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -660,6 +660,31 @@ class SessionTests(Base):
except Detached:
pass
+ def testRxCallback(self):
+ """Verify that the callback is invoked when a message is received.
+ """
+ ADDR = 'test-rx_callback-queue; {create: always, delete: receiver}'
+ class CallbackHandler:
+ def __init__(self):
+ self.handler_called = False
+ def __call__(self):
+ self.handler_called = True
+ cb = CallbackHandler()
+ self.ssn.set_message_received_handler(cb)
+ rcv = self.ssn.receiver(ADDR)
+ rcv.capacity = UNLIMITED
+ snd = self.ssn.sender(ADDR)
+ assert not cb.handler_called
+ snd.send("Ping")
+ deadline = time.time() + self.timeout()
+ while time.time() < deadline:
+ if cb.handler_called:
+ break;
+ assert cb.handler_called
+ snd.close()
+ rcv.close()
+
+
RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
class ReceiverTests(Base):