1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from abc import abstractmethod
import logging
import uuid
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_topic
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
@six.add_metaclass(abc.ABCMeta)
class Request(object):
def __init__(self, conf, target, context, message,
socket, timeout=None, retry=None):
if message['method'] is None:
errmsg = _LE("No method specified for RPC call")
LOG.error(errmsg)
raise KeyError(errmsg)
self.msg_id = uuid.uuid4().hex
self.target = target
self.context = context
self.message = message
self.timeout = self._to_milliseconds(conf, timeout)
self.retry = retry
self.reply = None
self.socket = socket
self.topic = zmq_topic.Topic.from_target(conf, target)
@staticmethod
def _to_milliseconds(conf, timeout):
return timeout * 1000 if timeout else conf.rpc_response_timeout * 1000
@property
def is_replied(self):
return self.reply is not None
@property
def is_timed_out(self):
return False
def send_request(self):
self.socket.send_string(str(self.topic), zmq.SNDMORE)
self.socket.send_string(self.msg_id, zmq.SNDMORE)
self.socket.send_json(self.context, zmq.SNDMORE)
self.socket.send_json(self.message)
def __call__(self):
self.send_request()
return self.receive_reply()
@abstractmethod
def receive_reply(self):
"Receive reply from server side"
|