diff options
author | Victor Sergeyev <vsergeyev@mirantis.com> | 2015-07-09 16:04:01 +0300 |
---|---|---|
committer | Victor Sergeyev <vsergeyev@mirantis.com> | 2015-07-10 15:50:47 +0300 |
commit | 48f2a87a273a43ca9006f9bdf3315bda338bd118 (patch) | |
tree | 2ec3f655a390f2b153a2e33ed067422fcae47651 | |
parent | bcdc0e88ecdb285dc3b21ce6ed858ddc1ac2c628 (diff) | |
download | oslo-messaging-48f2a87a273a43ca9006f9bdf3315bda338bd118.tar.gz |
Fix work with timeout in CallRequest.receive_reply()
Refactored CallRequest.receive_reply() method to raise MessagingTimeout
exception, when timeout is reached. Removed unused _to_milliseconds() method
Functional test CallTestCase.test_timeout() passes now
Change-Id: Idc3224646c3626a56606d019ff7ff155d3e3201a
-rw-r--r-- | oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py | 14 | ||||
-rw-r--r-- | oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py | 6 | ||||
-rw-r--r-- | oslo_messaging/tests/test_utils.py | 2 | ||||
-rw-r--r-- | tox.ini | 4 |
4 files changed, 16 insertions, 10 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py index 4602753..d66e5d4 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py @@ -14,6 +14,7 @@ import logging +import oslo_messaging from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request from oslo_messaging._drivers.zmq_driver import zmq_async @@ -31,10 +32,10 @@ class CallRequest(Request): def __init__(self, conf, target, context, message, timeout=None, retry=None, allowed_remote_exmods=None): self.allowed_remote_exmods = allowed_remote_exmods or [] + try: self.zmq_context = zmq.Context() socket = self.zmq_context.socket(zmq.REQ) - super(CallRequest, self).__init__(conf, target, context, message, socket, zmq_serializer.CALL_TYPE, @@ -50,8 +51,15 @@ class CallRequest(Request): def receive_reply(self): # NOTE(ozamiatin): Check for retry here (no retries now) - self.socket.setsockopt(zmq.RCVTIMEO, self.timeout) - reply = self.socket.recv_json() + poller = zmq_async.get_reply_poller() + poller.register(self.socket, + recv_method=lambda socket: socket.recv_json()) + + reply, socket = poller.poll(timeout=self.timeout) + if reply is None: + raise oslo_messaging.MessagingTimeout( + "Timeout %s seconds was reached" % self.timeout) + if reply['failure']: raise rpc_common.deserialize_remote_exception( reply['failure'], self.allowed_remote_exmods) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py index 9575bdc..badb040 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py @@ -47,16 +47,12 @@ class Request(object): self.target = target self.context = context self.message = message - self.timeout = self._to_milliseconds(conf, timeout) + self.timeout = timeout or conf.rpc_response_timeout self.retry = retry self.reply = None self.socket = socket self.topic = zmq_topic.Topic.from_target(conf, target) - @staticmethod - def _to_milliseconds(conf, timeout): - return timeout * 1000 if timeout else conf.rpc_response_timeout * 1000 - @property def is_replied(self): return self.reply is not None diff --git a/oslo_messaging/tests/test_utils.py b/oslo_messaging/tests/test_utils.py index 22178d8..d8a2912 100644 --- a/oslo_messaging/tests/test_utils.py +++ b/oslo_messaging/tests/test_utils.py @@ -84,4 +84,4 @@ class TimerTestCase(test_utils.BaseTestCase): callback = mock.Mock() remaining = t.check_return(callback) self.assertEqual(0, remaining) - callback.assert_called_once + self.assertEqual(1, callback.call_count) @@ -41,7 +41,9 @@ setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123// commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py27-func-zeromq] -commands = {toxinidir}/setup-test-env-zmq.sh python -m testtools.run oslo_messaging.tests.functional.test_functional.CallTestCase.test_exception +commands = {toxinidir}/setup-test-env-zmq.sh python -m testtools.run \ + oslo_messaging.tests.functional.test_functional.CallTestCase.test_exception \ + oslo_messaging.tests.functional.test_functional.CallTestCase.test_timeout # commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [flake8] |