summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGevorg Davoian <gdavoian@mirantis.com>2016-07-04 20:09:30 +0300
committerGevorg Davoian <gdavoian@mirantis.com>2016-07-08 12:50:58 +0000
commit66ded1f91426e80e48cb2e0aa84c50beabe5153e (patch)
treeb17e0193f6111d1cf4aefa3d7b8fcc08832d3e2c
parentac484f6b26c6509549edc1150673915b48482ac2 (diff)
downloadoslo-messaging-66ded1f91426e80e48cb2e0aa84c50beabe5153e.tar.gz
[zmq] Use json/msgpack instead of pickle
Change-Id: Ia4a08b6f2d932ad0642d64f55bcdadef814e4350 Closes-Bug: #1582207 Closes-Bug: #1584763 Depends-On: I90df59d61af2b40b516a5151c67c184fcc91e366
-rw-r--r--oslo_messaging/_drivers/impl_zmq.py7
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py13
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_response.py18
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py19
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py10
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py32
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py4
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_names.py10
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_socket.py38
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_pub_sub.py21
11 files changed, 112 insertions, 62 deletions
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index 3829fa5..5636d01 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -100,7 +100,12 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_bind_port_retries',
default=100,
help='Number of retries to find free port number before '
- 'fail with ZMQBindError.')
+ 'fail with ZMQBindError.'),
+
+ cfg.StrOpt('rpc_zmq_serialization', default='json',
+ choices=('json', 'msgpack'),
+ help='Default serialization mechanism for '
+ 'serializing/deserializing outgoing/incoming messages')
]
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py
index e8be2a3..63c683f 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py
@@ -19,6 +19,7 @@ import threading
import futurist
import six
+from oslo_messaging._drivers.zmq_driver.client import zmq_response
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@@ -118,9 +119,11 @@ class ReplyReceiverProxy(ReplyReceiver):
reply_id = socket.recv()
assert reply_id is not None, "Reply ID expected!"
message_type = int(socket.recv())
- assert message_type == zmq_names.REPLY_TYPE, "Reply is expected!"
+ assert message_type == zmq_names.REPLY_TYPE, "Reply expected!"
message_id = socket.recv()
- reply = socket.recv_pyobj()
+ raw_reply = socket.recv_loaded()
+ assert isinstance(raw_reply, dict), "Dict expected!"
+ reply = zmq_response.Response(**raw_reply)
LOG.debug("Received reply for %s", message_id)
return reply_id, message_type, message_id, reply
@@ -130,9 +133,11 @@ class ReplyReceiverDirect(ReplyReceiver):
def recv_response(self, socket):
empty = socket.recv()
assert empty == b'', "Empty expected!"
- reply = socket.recv_pyobj()
+ raw_reply = socket.recv_loaded()
+ assert isinstance(raw_reply, dict), "Dict expected!"
+ reply = zmq_response.Response(**raw_reply)
LOG.debug("Received reply for %s", reply.message_id)
- return reply.reply_id, reply.type_, reply.message_id, reply
+ return reply.reply_id, reply.msg_type, reply.message_id, reply
class AckAndReplyReceiver(ReceiverBase):
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py
index b6a7b75..35c38a8 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py
@@ -17,23 +17,18 @@ from oslo_messaging._drivers.zmq_driver import zmq_names
class Response(object):
- def __init__(self, id=None, type=None, message_id=None,
+ def __init__(self, msg_type=None, message_id=None,
reply_id=None, reply_body=None, failure=None):
- self._id = id
- self._type = type
+ self._msg_type = msg_type
self._message_id = message_id
self._reply_id = reply_id
self._reply_body = reply_body
self._failure = failure
@property
- def id_(self):
- return self._id
-
- @property
- def type_(self):
- return self._type
+ def msg_type(self):
+ return self._msg_type
@property
def message_id(self):
@@ -52,11 +47,10 @@ class Response(object):
return self._failure
def to_dict(self):
- return {zmq_names.FIELD_ID: self._id,
- zmq_names.FIELD_TYPE: self._type,
+ return {zmq_names.FIELD_MSG_TYPE: self._msg_type,
zmq_names.FIELD_MSG_ID: self._message_id,
zmq_names.FIELD_REPLY_ID: self._reply_id,
- zmq_names.FIELD_REPLY: self._reply_body,
+ zmq_names.FIELD_REPLY_BODY: self._reply_body,
zmq_names.FIELD_FAILURE: self._failure}
def __str__(self):
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
index f7cde0d..2fb8191 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
@@ -44,8 +44,8 @@ class RequestSenderProxy(SenderBase):
socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
socket.send(six.b(request.routing_key), zmq.SNDMORE)
socket.send(six.b(request.message_id), zmq.SNDMORE)
- socket.send_pyobj(request.context, zmq.SNDMORE)
- socket.send_pyobj(request.message)
+ socket.send_dumped(request.context, zmq.SNDMORE)
+ socket.send_dumped(request.message)
LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s message "
"%(msg_id)s to target %(target)s",
@@ -60,20 +60,23 @@ class ReplySenderProxy(SenderBase):
def send(self, socket, reply):
LOG.debug("Replying to %s", reply.message_id)
- assert reply.type_ == zmq_names.REPLY_TYPE, "Reply expected!"
+ assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!"
socket.send(b'', zmq.SNDMORE)
- socket.send(six.b(str(reply.type_)), zmq.SNDMORE)
+ socket.send(six.b(str(reply.msg_type)), zmq.SNDMORE)
socket.send(reply.reply_id, zmq.SNDMORE)
socket.send(reply.message_id, zmq.SNDMORE)
- socket.send_pyobj(reply)
+ socket.send_dumped(reply.to_dict())
class RequestSenderDirect(SenderBase):
def send(self, socket, request):
socket.send(b'', zmq.SNDMORE)
- socket.send_pyobj(request)
+ socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
+ socket.send_string(request.message_id, zmq.SNDMORE)
+ socket.send_dumped(request.context, zmq.SNDMORE)
+ socket.send_dumped(request.message)
LOG.debug("Sending %(msg_type)s message %(msg_id)s to "
"target %(target)s",
@@ -87,8 +90,8 @@ class ReplySenderDirect(SenderBase):
def send(self, socket, reply):
LOG.debug("Replying to %s", reply.message_id)
- assert reply.type_ == zmq_names.REPLY_TYPE, "Reply expected!"
+ assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!"
socket.send(reply.reply_id, zmq.SNDMORE)
socket.send(b'', zmq.SNDMORE)
- socket.send_pyobj(reply)
+ socket.send_dumped(reply.to_dict())
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
index cfde3ad..3715b84 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
@@ -60,10 +60,12 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
reply_id = socket.recv()
message_type = int(socket.recv())
message_id = socket.recv()
- context = socket.recv_pyobj()
- message = socket.recv_pyobj()
- LOG.debug("[%(host)s] Received message %(id)s",
- {"host": self.host, "id": message_id})
+ context = socket.recv_loaded()
+ message = socket.recv_loaded()
+ LOG.debug("[%(host)s] Received %(msg_type)s message %(msg_id)s",
+ {"host": self.host,
+ "msg_type": zmq_names.message_type_str(message_type),
+ "msg_id": message_id})
if message_type == zmq_names.CALL_TYPE:
return zmq_incoming_message.ZmqIncomingMessage(
context, message, reply_id, message_id, socket, self.sender
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
index 64cbcfd..99b65ed 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
@@ -15,7 +15,7 @@
import logging
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
-from oslo_messaging._drivers.zmq_driver.server.consumers\
+from oslo_messaging._drivers.zmq_driver.server.consumers \
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_async
@@ -38,29 +38,31 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
- request = socket.recv_pyobj()
- return request, reply_id
+ msg_type = int(socket.recv())
+ message_id = socket.recv_string()
+ context = socket.recv_loaded()
+ message = socket.recv_loaded()
+ return reply_id, msg_type, message_id, context, message
def receive_message(self, socket):
try:
- request, reply_id = self._receive_request(socket)
- LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s",
+ reply_id, msg_type, message_id, context, message = \
+ self._receive_request(socket)
+ LOG.debug("[%(host)s] Received %(msg_type)s message %(msg_id)s",
{"host": self.host,
- "type": request.msg_type,
- "id": request.message_id,
- "target": request.target})
+ "msg_type": zmq_names.message_type_str(msg_type),
+ "msg_id": message_id})
- if request.msg_type == zmq_names.CALL_TYPE:
+ if msg_type == zmq_names.CALL_TYPE:
return zmq_incoming_message.ZmqIncomingMessage(
- request.context, request.message, reply_id,
- request.message_id, socket, self.sender
+ context, message, reply_id, message_id, socket, self.sender
)
- elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
- return zmq_incoming_message.ZmqIncomingMessage(request.context,
- request.message)
+ elif msg_type in zmq_names.NON_BLOCKING_TYPES:
+ return zmq_incoming_message.ZmqIncomingMessage(context,
+ message)
else:
LOG.error(_LE("Unknown message type: %s"),
- zmq_names.message_type_str(request.msg_type))
+ zmq_names.message_type_str(msg_type))
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py
index a6e32aa..6fd13b7 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py
@@ -63,8 +63,8 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
def _receive_request(socket):
topic_filter = socket.recv()
message_id = socket.recv()
- context = socket.recv_pyobj()
- message = socket.recv_pyobj()
+ context = socket.recv_loaded()
+ message = socket.recv_loaded()
LOG.debug("Received %(topic_filter)s topic message %(id)s",
{'id': message_id, 'topic_filter': topic_filter})
return context, message
diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
index 2c76227..0ebfef5 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
@@ -50,7 +50,7 @@ class ZmqIncomingMessage(base.RpcIncomingMessage):
if self.sender is not None:
if failure is not None:
failure = rpc_common.serialize_remote_exception(failure)
- reply = zmq_response.Response(type=zmq_names.REPLY_TYPE,
+ reply = zmq_response.Response(msg_type=zmq_names.REPLY_TYPE,
message_id=self.message_id,
reply_id=self.reply_id,
reply_body=reply,
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py
index 8b63e0e..f61003c 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py
@@ -17,15 +17,11 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
zmq = zmq_async.import_zmq()
-FIELD_TYPE = 'type'
-FIELD_FAILURE = 'failure'
-FIELD_REPLY = 'reply'
-FIELD_ID = 'id'
-FIELD_MSG_ID = 'message_id'
FIELD_MSG_TYPE = 'msg_type'
+FIELD_MSG_ID = 'message_id'
FIELD_REPLY_ID = 'reply_id'
-FIELD_TARGET = 'target'
-FIELD_ROUTING_KEY = 'routing_key'
+FIELD_REPLY_BODY = 'reply_body'
+FIELD_FAILURE = 'failure'
IDX_REPLY_TYPE = 1
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
index a97343e..11c567a 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
@@ -23,6 +23,8 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI
from oslo_messaging import exceptions
+from oslo_serialization.serializer import json_serializer
+from oslo_serialization.serializer import msgpack_serializer
LOG = logging.getLogger(__name__)
@@ -31,6 +33,11 @@ zmq = zmq_async.import_zmq()
class ZmqSocket(object):
+ SERIALIZERS = {
+ 'json': json_serializer.JSONSerializer(),
+ 'msgpack': msgpack_serializer.MessagePackSerializer()
+ }
+
def __init__(self, conf, context, socket_type, high_watermark=0):
self.conf = conf
self.context = context
@@ -45,6 +52,14 @@ class ZmqSocket(object):
self.handle.identity = six.b(str(uuid.uuid4()))
self.connections = set()
+ def _get_serializer(self, serialization):
+ serializer = self.SERIALIZERS.get(serialization, None)
+ if serializer is None:
+ raise NotImplementedError(
+ "Serialization '{}' is not supported".format(serialization)
+ )
+ return serializer
+
def type_name(self):
return zmq_names.socket_type_str(self.socket_type)
@@ -77,6 +92,13 @@ class ZmqSocket(object):
def send_multipart(self, *args, **kwargs):
self.handle.send_multipart(*args, **kwargs)
+ def send_dumped(self, obj, *args, **kwargs):
+ serialization = kwargs.pop('serialization',
+ self.conf.rpc_zmq_serialization)
+ serializer = self._get_serializer(serialization)
+ s = serializer.dump_as_bytes(obj)
+ self.handle.send(s, *args, **kwargs)
+
def recv(self, *args, **kwargs):
return self.handle.recv(*args, **kwargs)
@@ -92,6 +114,14 @@ class ZmqSocket(object):
def recv_multipart(self, *args, **kwargs):
return self.handle.recv_multipart(*args, **kwargs)
+ def recv_loaded(self, *args, **kwargs):
+ serialization = kwargs.pop('serialization',
+ self.conf.rpc_zmq_serialization)
+ serializer = self._get_serializer(serialization)
+ s = self.handle.recv(*args, **kwargs)
+ obj = serializer.load_from_bytes(s)
+ return obj
+
def close(self, *args, **kwargs):
self.handle.close(*args, **kwargs)
@@ -106,10 +136,10 @@ class ZmqSocket(object):
"address": address})
self.connect(address)
except zmq.ZMQError as e:
- errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\
- % (stype, address, e)
- LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s"),
- (stype, address, e))
+ errmsg = _LE("Failed connecting %(stype)s to %(address)s: %(e)s") \
+ % {"stype": stype, "address": address, "e": e}
+ LOG.error(_LE("Failed connecting %(stype)s to %(address)s: %(e)s"),
+ {"stype": stype, "address": address, "e": e})
raise rpc_common.RPCException(errmsg)
def connect_to_host(self, host):
diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
index 0287ccf..50e9d1b 100644
--- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
+++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
@@ -12,9 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
-import pickle
+import json
+import msgpack
import time
+import six
+import testscenarios
+
import oslo_messaging
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_pub_publisher
@@ -23,6 +27,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging.tests.drivers.zmq import zmq_common
+load_tests = testscenarios.load_tests_apply_scenarios
zmq = zmq_async.import_zmq()
@@ -31,10 +36,18 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
LISTENERS_COUNT = 3
+ scenarios = [
+ ('json', {'serialization': 'json',
+ 'dumps': lambda obj: six.b(json.dumps(obj))}),
+ ('msgpack', {'serialization': 'msgpack',
+ 'dumps': msgpack.dumps})
+ ]
+
def setUp(self):
super(TestPubSub, self).setUp()
- kwargs = {'use_pub_sub': True}
+ kwargs = {'use_pub_sub': True,
+ 'rpc_zmq_serialization': self.serialization}
self.config(**kwargs)
self.publisher = zmq_pub_publisher.PubPublisherProxy(
@@ -58,8 +71,8 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
zmq_address.target_to_subscribe_filter(target),
b"message",
b"0000-0000",
- pickle.dumps(context),
- pickle.dumps(message)])
+ self.dumps(context),
+ self.dumps(message)])
def _check_listener(self, listener):
listener._received.wait(timeout=5)