summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-02-21 00:59:29 +0000
committerGerrit Code Review <review@openstack.org>2013-02-21 00:59:29 +0000
commit8b785580af6b1ccc3f2fdd5886df32286073db04 (patch)
tree1bd4674b33cf006167440c503082e1ffbe727b67
parentb3cae87153885f4b30cd2a27accfe1be8355bf06 (diff)
parent6f9cef85353155538fe14e5025e2d0bab5cc63e4 (diff)
downloadoslo-incubator-grizzly-3.tar.gz
Merge "Revert "Implement replay detection.""grizzly-3
-rw-r--r--openstack/common/rpc/common.py81
-rw-r--r--tests/unit/rpc/amqp.py8
-rw-r--r--tests/unit/rpc/test_common.py32
3 files changed, 22 insertions, 99 deletions
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py
index 14489c84..6c52bd8e 100644
--- a/openstack/common/rpc/common.py
+++ b/openstack/common/rpc/common.py
@@ -17,11 +17,9 @@
# License for the specific language governing permissions and limitations
# under the License.
-import collections
import copy
import sys
import traceback
-import uuid
from oslo.config import cfg
@@ -48,40 +46,33 @@ This version number applies to the message envelope that is used in the
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
-The current message format (version 2.1) is very simple. It is:
+The current message format (version 2.0) is very simple. It is:
{
'oslo.version': <RPC Envelope Version as a String>,
'oslo.message': <Application Message Payload, JSON encoded>
- 'oslo.nonce': <Unique message identifier>
}
Message format version '1.0' is just considered to be the messages we sent
without a message envelope.
-Message format version '2.0' sent oslo.message containing a JSON encoded
-Application Message Payload without Hashed Parameters.
+So, the current message envelope just includes the envelope version. It may
+eventually contain additional information, such as a signature for the message
+payload.
-The message format is intended eventually contain additional information,
-such as a signature for the message payload.
-
-We will JSON encode the application message payload. The message,
+We will JSON encode the application message payload. The message envelope,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
-_RPC_ENVELOPE_VERSION = '2.1'
+_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
-_NONCE_KEY = 'oslo.nonce'
# TODO(russellb) Turn this on after Grizzly.
_SEND_RPC_ENVELOPE = False
-DUP_MSG_CHECK_SIZE = 512 # Arbitrary - make configurable.
-SEEN_MSGS = collections.deque([], maxlen=DUP_MSG_CHECK_SIZE)
-
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
@@ -134,10 +125,6 @@ class Timeout(RPCException):
message = _("Timeout while waiting on RPC response.")
-class DuplicatedMessageError(RPCException):
- message = _("Received replayed message(%(msg_id)s). Ignoring.")
-
-
class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")
@@ -152,10 +139,6 @@ class UnsupportedRpcEnvelopeVersion(RPCException):
"not supported by this endpoint.")
-class InvalidRpcEnvelope(RPCException):
- message = _("RPC envelope was malformed.")
-
-
class Connection(object):
"""A connection, returned by rpc.create_connection().
@@ -455,32 +438,17 @@ def version_is_compatible(imp_version, version):
def serialize_msg(raw_msg, force_envelope=False):
- msg_identifier = uuid.uuid4().hex
-
if not _SEND_RPC_ENVELOPE and not force_envelope:
- if isinstance(raw_msg, dict):
- raw_msg['_nonce'] = msg_identifier
return raw_msg
- """Make an RPC message envelope"""
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
- _MESSAGE_KEY: jsonutils.dumps(raw_msg),
- _NONCE_KEY: msg_identifier}
+ _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
return msg
-def _raise_if_duplicate(duplicate_key):
- """Check if a message is a duplicate based on key."""
- if not duplicate_key:
- return
- if duplicate_key in SEEN_MSGS:
- raise DuplicatedMessageError(duplicate_key)
- SEEN_MSGS.append(duplicate_key)
-
-
def deserialize_msg(msg):
# NOTE(russellb): Hang on to your hats, this road is about to
# get a little bumpy.
@@ -505,32 +473,21 @@ def deserialize_msg(msg):
# This case covers return values from rpc.call() from before message
# envelopes were used. (messages to call a method were always a dict)
- has_envelope = True
- base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
if not isinstance(msg, dict):
# See #2 above.
return msg
- elif not all(map(lambda key: key in msg, base_envelope_keys)):
- # See #1.b above.
- has_envelope = False
- elif not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
+
+ base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
+ if not all(map(lambda key: key in msg, base_envelope_keys)):
+ # See #1.b above.
+ return msg
+
+ # At this point we think we have the message envelope
+ # format we were expecting. (#1.a above)
+
+ if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
- nonce = None
- raw_msg = None
-
- if has_envelope and '_NONCE_KEY' in msg: # envelope v2.1
- _raise_if_duplicate(msg[_NONCE_KEY])
-
- # Here, we can delay jsonutils.loads until
- # after we have verified the message.
- raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
- elif has_envelope: # envelope v2.0
- raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
- nonce = raw_msg.get('_nonce')
- _raise_if_duplicate(nonce)
- else: # no envelope ("v1.0")
- raw_msg = msg
- nonce = raw_msg.get('_nonce')
- _raise_if_duplicate(nonce)
+
+ raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
return raw_msg
diff --git a/tests/unit/rpc/amqp.py b/tests/unit/rpc/amqp.py
index c7215ead..1e4733c0 100644
--- a/tests/unit/rpc/amqp.py
+++ b/tests/unit/rpc/amqp.py
@@ -100,18 +100,14 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase):
}
self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
envelope=True)
- for k, v in msg.items():
- self.assertIn(k, self.test_msg)
- self.assertEqual(self.test_msg[k], v)
+ self.assertEqual(self.test_msg, msg)
# Make sure envelopes are still on notifications, even if turned off
# for general messages.
self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', False)
self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
envelope=True)
- for k, v in msg.items():
- self.assertIn(k, self.test_msg)
- self.assertEqual(self.test_msg[k], v)
+ self.assertEqual(self.test_msg, msg)
def test_single_reply_queue_on_has_ids(
self, single_reply_queue_for_callee_off=False):
diff --git a/tests/unit/rpc/test_common.py b/tests/unit/rpc/test_common.py
index b4d4cf47..976da84c 100644
--- a/tests/unit/rpc/test_common.py
+++ b/tests/unit/rpc/test_common.py
@@ -250,40 +250,10 @@ class RpcCommonTestCase(test_utils.BaseTestCase):
'oslo.message': jsonutils.dumps(msg)}
serialized = rpc_common.serialize_msg(msg)
- for k, v in s_msg.items():
- self.assertIn(k, serialized)
- self.assertEqual(serialized[k], v)
+ self.assertEqual(s_msg, rpc_common.serialize_msg(msg))
self.assertEqual(msg, rpc_common.deserialize_msg(serialized))
- def test_serialize_msg_v2_1(self):
- self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', True)
- msg = {'foo': 'bar'}
- s_msg = {'oslo.version': rpc_common._RPC_ENVELOPE_VERSION,
- 'oslo.message': jsonutils.dumps(msg),
- 'oslo.nonce': ''}
- serialized = rpc_common.serialize_msg(msg)
-
- for k, v in s_msg.items():
- self.assertIn(k, serialized)
-
- if k == 'oslo.nonce':
- # This key's value is set by serialize_msg
- re_uuid = re.compile(r'[0-9a-f]{32}$', re.I)
- self.assertTrue(re_uuid.match(serialized[k]))
- continue
-
- self.assertEqual(serialized[k], v)
-
- self.assertEqual(msg, rpc_common.deserialize_msg(serialized))
-
- def test_serialize_msg_v2_1(self):
- self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', True)
- msg = {'foo': 'bar'}
- serialized = rpc_common.serialize_msg(msg)
- self.assertIn('oslo.nonce', serialized)
- self.assertEqual(msg, rpc_common.deserialize_msg(serialized))
-
def test_deserialize_msg_no_envelope(self):
self.assertEqual(1, rpc_common.deserialize_msg(1))
self.assertEqual([], rpc_common.deserialize_msg([]))