summaryrefslogtreecommitdiff
path: root/nova/tests/fixtures/notifications.py
blob: c46b3a919d147d9fefe07b716c67c06ea86a5bfe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import collections
import functools
import threading

import eventlet
import fixtures
from oslo_log import log as logging
import oslo_messaging
from oslo_serialization import jsonutils
from oslo_utils import excutils
from oslo_utils import timeutils

from nova import rpc

LOG = logging.getLogger(__name__)


class _Sub(object):
    """Allow a subscriber to efficiently wait for an event to occur, and
    retrieve events which have occurred.
    """

    def __init__(self):
        self._cond = threading.Condition()
        self._notifications = []

    def received(self, notification):
        with self._cond:
            self._notifications.append(notification)
            self._cond.notifyAll()

    def wait_n(self, n, event, timeout):
        """Wait until at least n notifications have been received, and return
        them. May return less than n notifications if timeout is reached.
        """

        with timeutils.StopWatch(timeout) as timer:
            with self._cond:
                while len(self._notifications) < n:
                    if timer.expired():
                        # notifications = pprint.pformat(
                        #     {event: sub._notifications
                        #      for event, sub in VERSIONED_SUBS.items()})
                        # FIXME: tranform this to get access to all the
                        # versioned notifications
                        notifications = []
                        raise AssertionError(
                            "Notification %(event)s hasn't been "
                            "received. Received:\n%(notifications)s" % {
                                'event': event,
                                'notifications': notifications,
                            })
                    self._cond.wait(timer.leftover())

                # Return a copy of the notifications list
                return list(self._notifications)


FakeMessage = collections.namedtuple(
    'FakeMessage',
    ['publisher_id', 'priority', 'event_type', 'payload', 'context'])


class FakeNotifier(object):

    def __init__(
        self, transport, publisher_id, serializer=None, parent=None,
        test_case_id=None
    ):
        self.transport = transport
        self.publisher_id = publisher_id
        self._serializer = \
            serializer or oslo_messaging.serializer.NoOpSerializer()
        if parent:
            self.notifications = parent.notifications
        else:
            self.notifications = []

        for priority in ['debug', 'info', 'warn', 'error', 'critical']:
            setattr(
                self, priority,
                functools.partial(self._notify, priority.upper()),
            )

        self.test_case_id = test_case_id

    def prepare(self, publisher_id=None):
        if publisher_id is None:
            publisher_id = self.publisher_id

        return self.__class__(
            self.transport, publisher_id,
            serializer=self._serializer, parent=self,
            test_case_id=self.test_case_id
        )

    def _notify(self, priority, ctxt, event_type, payload):
        try:
            payload = self._serializer.serialize_entity(ctxt, payload)
        except Exception:
            with excutils.save_and_reraise_exception():
                LOG.error('Error serializing payload: %s', payload)

        # NOTE(sileht): simulate the kombu serializer
        # this permit to raise an exception if something have not
        # been serialized correctly
        jsonutils.to_primitive(payload)
        # NOTE(melwitt): Try to serialize the context, as the rpc would.
        #                An exception will be raised if something is wrong
        #                with the context.
        self._serializer.serialize_context(ctxt)
        msg = FakeMessage(
            self.publisher_id, priority, event_type, payload, ctxt)
        self.notifications.append(msg)

    def is_enabled(self):
        return True

    def reset(self):
        self.notifications.clear()


class FakeVersionedNotifier(FakeNotifier):
    def __init__(
        self, transport, publisher_id, serializer=None, parent=None,
        test_case_id=None
    ):
        super().__init__(
            transport, publisher_id, serializer, test_case_id=test_case_id)
        if parent:
            self.versioned_notifications = parent.versioned_notifications
        else:
            self.versioned_notifications = []

        if parent:
            self.subscriptions = parent.subscriptions
        else:
            self.subscriptions = collections.defaultdict(_Sub)

    @staticmethod
    def _get_sender_test_case_id():
        current = eventlet.getcurrent()
        # NOTE(gibi) not all eventlet spawn is under our control, so there can
        # be senders without test_case_id set, find the first ancestor that
        # was spawned from nova.utils.spawn[_n] and therefore has the id set.
        while not getattr(current, 'test_case_id', None):
            current = current.parent
        return current.test_case_id

    def _notify(self, priority, ctxt, event_type, payload):
        sender_test_case_id = self._get_sender_test_case_id()
        # NOTE(gibi): this is here to prevent late notifications from already
        # finished test cases to break the currently running test case. See
        # more in https://bugs.launchpad.net/nova/+bug/1946339
        if sender_test_case_id != self.test_case_id:
            raise RuntimeError(
                'FakeVersionedNotifier received %s notification emitted by %s '
                'test case which is different from the currently running test '
                'case %s. This notification is ignored. The sender test case '
                'probably leaked a running eventlet that emitted '
                'notifications after the test case finished. Now this eventlet'
                'is terminated by raising this exception.' %
                (event_type, sender_test_case_id, self.test_case_id))

        payload = self._serializer.serialize_entity(ctxt, payload)
        notification = {
            'publisher_id': self.publisher_id,
            'priority': priority,
            'event_type': event_type,
            'payload': payload,
        }
        self.versioned_notifications.append(notification)
        self.subscriptions[event_type].received(notification)

    def reset(self):
        self.versioned_notifications.clear()
        self.subscriptions.clear()

    def wait_for_versioned_notifications(
        self, event_type, n_events=1, timeout=10.0,
    ):
        return self.subscriptions[event_type].wait_n(
            n_events, event_type, timeout)


class NotificationFixture(fixtures.Fixture):
    def __init__(self, test):
        self.test = test

    def setUp(self):
        super().setUp()
        self.addCleanup(self.reset)

        self.fake_notifier = FakeNotifier(
            rpc.LEGACY_NOTIFIER.transport,
            rpc.LEGACY_NOTIFIER.publisher_id,
            serializer=getattr(
                rpc.LEGACY_NOTIFIER, '_serializer', None))
        self.fake_versioned_notifier = FakeVersionedNotifier(
            rpc.NOTIFIER.transport,
            rpc.NOTIFIER.publisher_id,
            serializer=getattr(rpc.NOTIFIER, '_serializer', None),
            test_case_id=self.test.id()
        )
        if rpc.LEGACY_NOTIFIER and rpc.NOTIFIER:
            self.test.stub_out('nova.rpc.LEGACY_NOTIFIER', self.fake_notifier)
            self.test.stub_out(
                'nova.rpc.NOTIFIER', self.fake_versioned_notifier)

    def reset(self):
        self.fake_notifier.reset()
        self.fake_versioned_notifier.reset()

    def wait_for_versioned_notifications(
        self, event_type, n_events=1, timeout=10.0,
    ):
        return self.fake_versioned_notifier.wait_for_versioned_notifications(
            event_type, n_events, timeout,
        )

    @property
    def versioned_notifications(self):
        return self.fake_versioned_notifier.versioned_notifications

    @property
    def notifications(self):
        return self.fake_notifier.notifications