summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Sergeyev <vsergeyev@mirantis.com>2015-07-09 16:04:01 +0300
committerVictor Sergeyev <vsergeyev@mirantis.com>2015-07-10 15:50:47 +0300
commit48f2a87a273a43ca9006f9bdf3315bda338bd118 (patch)
tree2ec3f655a390f2b153a2e33ed067422fcae47651
parentbcdc0e88ecdb285dc3b21ce6ed858ddc1ac2c628 (diff)
downloadoslo-messaging-48f2a87a273a43ca9006f9bdf3315bda338bd118.tar.gz
Fix work with timeout in CallRequest.receive_reply()
Refactored CallRequest.receive_reply() method to raise MessagingTimeout exception, when timeout is reached. Removed unused _to_milliseconds() method Functional test CallTestCase.test_timeout() passes now Change-Id: Idc3224646c3626a56606d019ff7ff155d3e3201a
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py14
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py6
-rw-r--r--oslo_messaging/tests/test_utils.py2
-rw-r--r--tox.ini4
4 files changed, 16 insertions, 10 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py
index 4602753..d66e5d4 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py
@@ -14,6 +14,7 @@
import logging
+import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request
from oslo_messaging._drivers.zmq_driver import zmq_async
@@ -31,10 +32,10 @@ class CallRequest(Request):
def __init__(self, conf, target, context, message, timeout=None,
retry=None, allowed_remote_exmods=None):
self.allowed_remote_exmods = allowed_remote_exmods or []
+
try:
self.zmq_context = zmq.Context()
socket = self.zmq_context.socket(zmq.REQ)
-
super(CallRequest, self).__init__(conf, target, context,
message, socket,
zmq_serializer.CALL_TYPE,
@@ -50,8 +51,15 @@ class CallRequest(Request):
def receive_reply(self):
# NOTE(ozamiatin): Check for retry here (no retries now)
- self.socket.setsockopt(zmq.RCVTIMEO, self.timeout)
- reply = self.socket.recv_json()
+ poller = zmq_async.get_reply_poller()
+ poller.register(self.socket,
+ recv_method=lambda socket: socket.recv_json())
+
+ reply, socket = poller.poll(timeout=self.timeout)
+ if reply is None:
+ raise oslo_messaging.MessagingTimeout(
+ "Timeout %s seconds was reached" % self.timeout)
+
if reply['failure']:
raise rpc_common.deserialize_remote_exception(
reply['failure'], self.allowed_remote_exmods)
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py
index 9575bdc..badb040 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py
@@ -47,16 +47,12 @@ class Request(object):
self.target = target
self.context = context
self.message = message
- self.timeout = self._to_milliseconds(conf, timeout)
+ self.timeout = timeout or conf.rpc_response_timeout
self.retry = retry
self.reply = None
self.socket = socket
self.topic = zmq_topic.Topic.from_target(conf, target)
- @staticmethod
- def _to_milliseconds(conf, timeout):
- return timeout * 1000 if timeout else conf.rpc_response_timeout * 1000
-
@property
def is_replied(self):
return self.reply is not None
diff --git a/oslo_messaging/tests/test_utils.py b/oslo_messaging/tests/test_utils.py
index 22178d8..d8a2912 100644
--- a/oslo_messaging/tests/test_utils.py
+++ b/oslo_messaging/tests/test_utils.py
@@ -84,4 +84,4 @@ class TimerTestCase(test_utils.BaseTestCase):
callback = mock.Mock()
remaining = t.check_return(callback)
self.assertEqual(0, remaining)
- callback.assert_called_once
+ self.assertEqual(1, callback.call_count)
diff --git a/tox.ini b/tox.ini
index 7d0d665..d91508b 100644
--- a/tox.ini
+++ b/tox.ini
@@ -41,7 +41,9 @@ setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123//
commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
[testenv:py27-func-zeromq]
-commands = {toxinidir}/setup-test-env-zmq.sh python -m testtools.run oslo_messaging.tests.functional.test_functional.CallTestCase.test_exception
+commands = {toxinidir}/setup-test-env-zmq.sh python -m testtools.run \
+ oslo_messaging.tests.functional.test_functional.CallTestCase.test_exception \
+ oslo_messaging.tests.functional.test_functional.CallTestCase.test_timeout
# commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
[flake8]