summaryrefslogtreecommitdiff
path: root/heat/common/messaging.py
blob: fefbadcddd64c2789843abbb33c8c80431a004f6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# -*- coding: utf-8 -*-
# Copyright 2013 eNovance <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import eventlet
from oslo_config import cfg
import oslo_messaging
from oslo_messaging.rpc import dispatcher
from oslo_serialization import jsonutils
from osprofiler import profiler

from heat.common import context


TRANSPORT = None
NOTIFICATIONS_TRANSPORT = None
NOTIFIER = None


class RequestContextSerializer(oslo_messaging.Serializer):
    def __init__(self, base):
        self._base = base

    def serialize_entity(self, ctxt, entity):
        if not self._base:
            return entity
        return self._base.serialize_entity(ctxt, entity)

    def deserialize_entity(self, ctxt, entity):
        if not self._base:
            return entity
        return self._base.deserialize_entity(ctxt, entity)

    @staticmethod
    def serialize_context(ctxt):
        _context = ctxt.to_dict()
        prof = profiler.get()
        if prof:
            trace_info = {
                "hmac_key": prof.hmac_key,
                "base_id": prof.get_base_id(),
                "parent_id": prof.get_id()
            }
            _context.update({"trace_info": trace_info})
        return _context

    @staticmethod
    def deserialize_context(ctxt):
        trace_info = ctxt.pop("trace_info", None)
        if trace_info:
            profiler.init(**trace_info)
        return context.RequestContext.from_dict(ctxt)


class JsonPayloadSerializer(oslo_messaging.NoOpSerializer):
    @classmethod
    def serialize_entity(cls, context, entity):
        return jsonutils.to_primitive(entity, convert_instances=True)


def get_specific_transport(url, optional, exmods, is_for_notifications=False):
    try:
        if is_for_notifications:
            return oslo_messaging.get_notification_transport(
                cfg.CONF, url, allowed_remote_exmods=exmods)
        else:
            return oslo_messaging.get_rpc_transport(
                cfg.CONF, url, allowed_remote_exmods=exmods)
    except oslo_messaging.InvalidTransportURL as e:
        if not optional or e.url:
            # NOTE(sileht): oslo_messaging is configured but unloadable
            # so reraise the exception
            raise
        else:
            return None


def setup_transports(url, optional):
    global TRANSPORT, NOTIFICATIONS_TRANSPORT
    oslo_messaging.set_transport_defaults('heat')
    exmods = ['heat.common.exception']
    TRANSPORT = get_specific_transport(url, optional, exmods)
    NOTIFICATIONS_TRANSPORT = get_specific_transport(url, optional, exmods,
                                                     is_for_notifications=True)


def setup(url=None, optional=False):
    """Initialise the oslo_messaging layer."""
    global NOTIFIER

    if url and url.startswith("fake://"):
        # NOTE(sileht): oslo_messaging fake driver uses time.sleep
        # for task switch, so we need to monkey_patch it
        eventlet.monkey_patch(time=True)
    if not TRANSPORT or not NOTIFICATIONS_TRANSPORT:
        setup_transports(url, optional)
        # In the fake driver, make the dict of exchanges local to each exchange
        # manager, instead of using the shared class attribute. Doing otherwise
        # breaks the unit tests.
        if url and url.startswith("fake://"):
            TRANSPORT._driver._exchange_manager._exchanges = {}

    if not NOTIFIER and NOTIFICATIONS_TRANSPORT:
        serializer = RequestContextSerializer(JsonPayloadSerializer())
        NOTIFIER = oslo_messaging.Notifier(NOTIFICATIONS_TRANSPORT,
                                           serializer=serializer)


def cleanup():
    """Cleanup the oslo_messaging layer."""
    global TRANSPORT, NOTIFICATIONS_TRANSPORT, NOTIFIER
    if TRANSPORT:
        TRANSPORT.cleanup()
        NOTIFICATIONS_TRANSPORT.cleanup()
        TRANSPORT = NOTIFICATIONS_TRANSPORT = NOTIFIER = None


def get_rpc_server(target, endpoint):
    """Return a configured oslo_messaging rpc server."""
    serializer = RequestContextSerializer(JsonPayloadSerializer())
    access_policy = dispatcher.DefaultRPCAccessPolicy
    return oslo_messaging.get_rpc_server(TRANSPORT, target, [endpoint],
                                         executor='eventlet',
                                         serializer=serializer,
                                         access_policy=access_policy)


def get_rpc_client(**kwargs):
    """Return a configured oslo_messaging RPCClient."""
    target = oslo_messaging.Target(**kwargs)
    serializer = RequestContextSerializer(JsonPayloadSerializer())
    return oslo_messaging.RPCClient(TRANSPORT, target,
                                    serializer=serializer)


def get_notifier(publisher_id):
    """Return a configured oslo_messaging notifier."""
    return NOTIFIER.prepare(publisher_id=publisher_id)