diff options
author | Victor Sergeyev <vsergeyev@mirantis.com> | 2015-07-09 16:04:01 +0300 |
---|---|---|
committer | Victor Sergeyev <vsergeyev@mirantis.com> | 2015-07-10 15:50:47 +0300 |
commit | 48f2a87a273a43ca9006f9bdf3315bda338bd118 (patch) | |
tree | 2ec3f655a390f2b153a2e33ed067422fcae47651 /oslo_messaging/_drivers/zmq_driver/rpc | |
parent | bcdc0e88ecdb285dc3b21ce6ed858ddc1ac2c628 (diff) | |
download | oslo-messaging-48f2a87a273a43ca9006f9bdf3315bda338bd118.tar.gz |
Fix work with timeout in CallRequest.receive_reply()
Refactored CallRequest.receive_reply() method to raise MessagingTimeout
exception, when timeout is reached. Removed unused _to_milliseconds() method
Functional test CallTestCase.test_timeout() passes now
Change-Id: Idc3224646c3626a56606d019ff7ff155d3e3201a
Diffstat (limited to 'oslo_messaging/_drivers/zmq_driver/rpc')
-rw-r--r-- | oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py | 14 | ||||
-rw-r--r-- | oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py | 6 |
2 files changed, 12 insertions, 8 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py index 4602753..d66e5d4 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py @@ -14,6 +14,7 @@ import logging +import oslo_messaging from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request from oslo_messaging._drivers.zmq_driver import zmq_async @@ -31,10 +32,10 @@ class CallRequest(Request): def __init__(self, conf, target, context, message, timeout=None, retry=None, allowed_remote_exmods=None): self.allowed_remote_exmods = allowed_remote_exmods or [] + try: self.zmq_context = zmq.Context() socket = self.zmq_context.socket(zmq.REQ) - super(CallRequest, self).__init__(conf, target, context, message, socket, zmq_serializer.CALL_TYPE, @@ -50,8 +51,15 @@ class CallRequest(Request): def receive_reply(self): # NOTE(ozamiatin): Check for retry here (no retries now) - self.socket.setsockopt(zmq.RCVTIMEO, self.timeout) - reply = self.socket.recv_json() + poller = zmq_async.get_reply_poller() + poller.register(self.socket, + recv_method=lambda socket: socket.recv_json()) + + reply, socket = poller.poll(timeout=self.timeout) + if reply is None: + raise oslo_messaging.MessagingTimeout( + "Timeout %s seconds was reached" % self.timeout) + if reply['failure']: raise rpc_common.deserialize_remote_exception( reply['failure'], self.allowed_remote_exmods) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py index 9575bdc..badb040 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py @@ -47,16 +47,12 @@ class Request(object): self.target = target self.context = context self.message = message - self.timeout = self._to_milliseconds(conf, timeout) + self.timeout = timeout or conf.rpc_response_timeout self.retry = retry self.reply = None self.socket = socket self.topic = zmq_topic.Topic.from_target(conf, target) - @staticmethod - def _to_milliseconds(conf, timeout): - return timeout * 1000 if timeout else conf.rpc_response_timeout * 1000 - @property def is_replied(self): return self.reply is not None |