summaryrefslogtreecommitdiff
path: root/neutron/tests/unit/extensions/test_data_plane_status.py
blob: a6c99f1b7796f26ed996e4454331cab095feb841 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Copyright (c) 2017 NEC Corporation.  All rights reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from webob import exc as web_exc

from neutron_lib.api.definitions import data_plane_status as dps_lib
from neutron_lib.api.definitions import port as port_def
from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib.db import resource_extend
from neutron_lib.tests.unit import fake_notifier

from neutron.db import data_plane_status_db as dps_db
from neutron.db import db_base_plugin_v2
from neutron.extensions import data_plane_status as dps_ext
from neutron.tests.unit.db import test_db_base_plugin_v2


class DataPlaneStatusTestExtensionManager(object):

    def get_resources(self):
        return []

    def get_actions(self):
        return []

    def get_request_extensions(self):
        return []

    def get_extended_resources(self, version):
        return dps_ext.Data_plane_status.get_extended_resources(version)


@resource_extend.has_resource_extenders
class DataPlaneStatusExtensionTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
                                         dps_db.DataPlaneStatusMixin):

    supported_extension_aliases = [dps_lib.ALIAS]

    @staticmethod
    @resource_extend.extends([port_def.COLLECTION_NAME])
    def _extend_port_data_plane_status(port_res, port_db):
        return dps_db.DataPlaneStatusMixin._extend_port_data_plane_status(
            port_res, port_db)

    def update_port(self, context, id, port):
        with db_api.CONTEXT_WRITER.using(context):
            ret_port = super(DataPlaneStatusExtensionTestPlugin,
                             self).update_port(context, id, port)
            if dps_lib.DATA_PLANE_STATUS in port['port']:
                self._process_update_port_data_plane_status(context,
                                                            port['port'],
                                                            ret_port)
        return ret_port


class DataPlaneStatusExtensionTestCase(
        test_db_base_plugin_v2.NeutronDbPluginV2TestCase):

    def setUp(self):
        plugin = ('neutron.tests.unit.extensions.test_data_plane_status.'
                  'DataPlaneStatusExtensionTestPlugin')
        ext_mgr = DataPlaneStatusTestExtensionManager()
        super(DataPlaneStatusExtensionTestCase, self).setUp(
            plugin=plugin, ext_mgr=ext_mgr)

    def test_update_port_data_plane_status(self):
        with self.port() as port:
            data = {'port': {'data_plane_status': constants.ACTIVE}}
            req = self.new_update_request(port_def.COLLECTION_NAME,
                                          data,
                                          port['port']['id'])
            res = req.get_response(self.api)
            p = self.deserialize(self.fmt, res)['port']
            self.assertEqual(200, res.status_code)
            self.assertEqual(p[dps_lib.DATA_PLANE_STATUS], constants.ACTIVE)

    def test_port_create_data_plane_status_default_none(self):
        with self.port(name='port1') as port:
            req = self.new_show_request(
                port_def.COLLECTION_NAME, port['port']['id'])
            res = self.deserialize(self.fmt, req.get_response(self.api))
            self.assertIsNone(res['port'][dps_lib.DATA_PLANE_STATUS])

    def test_port_create_invalid_attr_data_plane_status(self):
        kwargs = {dps_lib.DATA_PLANE_STATUS: constants.ACTIVE}
        with self.network() as network:
            with self.subnet(network=network):
                res = self._create_port(self.fmt, network['network']['id'],
                                        arg_list=(dps_lib.DATA_PLANE_STATUS,),
                                        **kwargs)
                self.assertEqual(400, res.status_code)

    def test_port_update_preserves_data_plane_status(self):
        with self.port(name='port1') as port:
            res = self._update(port_def.COLLECTION_NAME, port['port']['id'],
                               {'port': {dps_lib.DATA_PLANE_STATUS:
                                         constants.ACTIVE}})
            res = self._update(port_def.COLLECTION_NAME, port['port']['id'],
                               {'port': {'name': 'port2'}})
            self.assertEqual(res['port']['name'], 'port2')
            self.assertEqual(res['port'][dps_lib.DATA_PLANE_STATUS],
                             constants.ACTIVE)

    def test_port_update_with_invalid_data_plane_status(self):
        with self.port(name='port1') as port:
            self._update(port_def.COLLECTION_NAME, port['port']['id'],
                         {'port': {dps_lib.DATA_PLANE_STATUS: "abc"}},
                         web_exc.HTTPBadRequest.code)

    def test_port_update_event_on_data_plane_status(self):
        expect_notify = set(['port.update.start',
                             'port.update.end'])
        with self.port(name='port1') as port:
            self._update(port_def.COLLECTION_NAME, port['port']['id'],
                         {'port': {dps_lib.DATA_PLANE_STATUS:
                                   constants.ACTIVE}})
            notify = set(n['event_type'] for n in fake_notifier.NOTIFICATIONS)
            duplicated_notify = expect_notify & notify
            self.assertEqual(expect_notify, duplicated_notify)
            fake_notifier.reset()