diff options
author | Zuul <zuul@review.opendev.org> | 2023-05-11 02:47:16 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2023-05-11 02:47:16 +0000 |
commit | 64c5f5cdaaf78f5cb16856827b762b441a997c62 (patch) | |
tree | 7460c169874f02c1b0f22f9492172393c3c4368f | |
parent | 272a315109862fa4058616e63d6f51b9ec16e822 (diff) | |
parent | be0dc09d52efd5e7236a33be552edb6644371cd0 (diff) | |
download | neutron-64c5f5cdaaf78f5cb16856827b762b441a997c62.tar.gz |
Merge "[S-RBAC] Fix new policies for get QoS rules APIs"
-rw-r--r-- | neutron/conf/policies/base.py | 14 | ||||
-rw-r--r-- | neutron/conf/policies/qos.py | 16 | ||||
-rw-r--r-- | neutron/tests/unit/conf/policies/test_qos.py | 593 |
3 files changed, 356 insertions, 267 deletions
diff --git a/neutron/conf/policies/base.py b/neutron/conf/policies/base.py index 14e419e7d1..395e4fd7f5 100644 --- a/neutron/conf/policies/base.py +++ b/neutron/conf/policies/base.py @@ -43,6 +43,20 @@ RULE_NET_OWNER = 'rule:network_owner' RULE_PARENT_OWNER = 'rule:ext_parent_owner' RULE_SG_OWNER = 'rule:sg_owner' +# In some cases we need to check owner of the parent resource, it's like that +# for example for QoS rules (check owner of QoS policy rule belongs to) or +# Floating IP port forwarding (check owner of FIP which PF is using). It's like +# that becasue those resources (QOS rules, FIP PFs) don't have project_id +# attribute at all and they belongs to the same project as parent resource (QoS +# policy, FIP). +PARENT_OWNER_MEMBER = 'role:member and ' + RULE_PARENT_OWNER +PARENT_OWNER_READER = 'role:reader and ' + RULE_PARENT_OWNER +ADMIN_OR_PARENT_OWNER_MEMBER = ( + '(' + ADMIN + ') or (' + PARENT_OWNER_MEMBER + ')') +ADMIN_OR_PARENT_OWNER_READER = ( + '(' + ADMIN + ') or (' + PARENT_OWNER_READER + ')') + + rules = [ policy.RuleDefault( 'context_is_admin', diff --git a/neutron/conf/policies/qos.py b/neutron/conf/policies/qos.py index c9381bab23..2fc9d0975c 100644 --- a/neutron/conf/policies/qos.py +++ b/neutron/conf/policies/qos.py @@ -126,7 +126,7 @@ rules = [ policy.DocumentedRuleDefault( name='get_policy_bandwidth_limit_rule', - check_str=base.ADMIN_OR_PROJECT_READER, + check_str=base.ADMIN_OR_PARENT_OWNER_READER, scope_types=['project'], description='Get a QoS bandwidth limit rule', operations=[ @@ -202,7 +202,7 @@ rules = [ policy.DocumentedRuleDefault( name='get_policy_packet_rate_limit_rule', - check_str=base.ADMIN_OR_PROJECT_READER, + check_str=base.ADMIN_OR_PARENT_OWNER_READER, scope_types=['project'], description='Get a QoS packet rate limit rule', operations=[ @@ -258,7 +258,7 @@ rules = [ policy.DocumentedRuleDefault( name='get_policy_dscp_marking_rule', - check_str=base.ADMIN_OR_PROJECT_READER, + check_str=base.ADMIN_OR_PARENT_OWNER_READER, scope_types=['project'], description='Get a QoS DSCP marking rule', operations=[ @@ -334,7 +334,7 @@ rules = [ policy.DocumentedRuleDefault( name='get_policy_minimum_bandwidth_rule', - check_str=base.ADMIN_OR_PROJECT_READER, + check_str=base.ADMIN_OR_PARENT_OWNER_READER, scope_types=['project'], description='Get a QoS minimum bandwidth rule', operations=[ @@ -409,7 +409,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_policy_minimum_packet_rate_rule', - check_str=base.ADMIN_OR_PROJECT_READER, + check_str=base.ADMIN_OR_PARENT_OWNER_READER, scope_types=['project'], description='Get a QoS minimum packet rate rule', operations=[ @@ -464,7 +464,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_alias_bandwidth_limit_rule', - check_str=base.ADMIN_OR_PROJECT_READER, + check_str=base.ADMIN_OR_PARENT_OWNER_READER, scope_types=['project'], description='Get a QoS bandwidth limit rule through alias', operations=[ @@ -515,7 +515,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_alias_dscp_marking_rule', - check_str=base.ADMIN_OR_PROJECT_READER, + check_str=base.ADMIN_OR_PARENT_OWNER_READER, scope_types=['project'], description='Get a QoS DSCP marking rule through alias', operations=[ @@ -566,7 +566,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_alias_minimum_bandwidth_rule', - check_str=base.ADMIN_OR_PROJECT_READER, + check_str=base.ADMIN_OR_PARENT_OWNER_READER, scope_types=['project'], description='Get a QoS minimum bandwidth rule through alias', operations=[ diff --git a/neutron/tests/unit/conf/policies/test_qos.py b/neutron/tests/unit/conf/policies/test_qos.py index 8b468484da..2b4d7aea03 100644 --- a/neutron/tests/unit/conf/policies/test_qos.py +++ b/neutron/tests/unit/conf/policies/test_qos.py @@ -230,18 +230,20 @@ class QosRulesAPITestCase(base.PolicyBaseTestCase): super(QosRulesAPITestCase, self).setUp() self.qos_policy = { 'id': uuidutils.generate_uuid(), + 'tenant_id': self.project_id, 'project_id': self.project_id} + self.alt_qos_policy = { + 'id': uuidutils.generate_uuid(), + 'tenant_id': self.alt_project_id, + 'project_id': self.alt_project_id} self.target = { - 'project_id': self.project_id, 'policy_id': self.qos_policy['id'], 'ext_parent_policy_id': self.qos_policy['id']} self.alt_target = { - 'project_id': self.alt_project_id, - 'policy_id': self.qos_policy['id'], - 'ext_parent_policy_id': self.qos_policy['id']} + 'policy_id': self.alt_qos_policy['id'], + 'ext_parent_policy_id': self.alt_qos_policy['id']} self.plugin_mock = mock.Mock() - self.plugin_mock.get_qos_policy.return_value = self.qos_policy mock.patch( 'neutron_lib.plugins.directory.get_plugin', return_value=self.plugin_mock).start() @@ -254,28 +256,33 @@ class SystemAdminQosBandwidthLimitRuleTests(QosRulesAPITestCase): self.context = self.system_admin_ctx def test_get_policy_bandwidth_limit_rule(self): - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_policy_bandwidth_limit_rule', - self.target) - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_policy_bandwidth_limit_rule', - self.alt_target) - - # And the same for aliases - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_alias_bandwidth_limit_rule', - self.target) - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_alias_bandwidth_limit_rule', - self.alt_target) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_policy_bandwidth_limit_rule', + self.target) + # And the same for aliases + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_alias_bandwidth_limit_rule', + self.target) + + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_policy_bandwidth_limit_rule', + self.alt_target) + # And the same for aliases + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_alias_bandwidth_limit_rule', + self.alt_target) def test_create_policy_bandwidth_limit_rule(self): self.assertRaises( @@ -361,24 +368,29 @@ class AdminQosBandwidthLimitRuleTests(QosRulesAPITestCase): self.context = self.project_admin_ctx def test_get_policy_bandwidth_limit_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'get_policy_bandwidth_limit_rule', - self.target)) - self.assertTrue( - policy.enforce(self.context, - 'get_policy_bandwidth_limit_rule', - self.alt_target)) - - # And the same for aliases - self.assertTrue( - policy.enforce(self.context, - 'get_alias_bandwidth_limit_rule', - self.target)) - self.assertTrue( - policy.enforce(self.context, - 'get_alias_bandwidth_limit_rule', - self.alt_target)) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_bandwidth_limit_rule', + self.target)) + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'get_alias_bandwidth_limit_rule', + self.target)) + + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_bandwidth_limit_rule', + self.alt_target)) + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'get_alias_bandwidth_limit_rule', + self.alt_target)) def test_create_policy_bandwidth_limit_rule(self): self.assertTrue( @@ -439,26 +451,32 @@ class ProjectMemberQosBandwidthLimitRuleTests( self.context = self.project_member_ctx def test_get_policy_bandwidth_limit_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'get_policy_bandwidth_limit_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_policy_bandwidth_limit_rule', - self.alt_target) - - # And the same for aliases - self.assertTrue( - policy.enforce(self.context, - 'get_alias_bandwidth_limit_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_alias_bandwidth_limit_rule', - self.alt_target) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_bandwidth_limit_rule', + self.target)) + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'get_alias_bandwidth_limit_rule', + self.target)) + + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_policy_bandwidth_limit_rule', + self.alt_target) + + # And the same for aliases + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_alias_bandwidth_limit_rule', + self.alt_target) def test_create_policy_bandwidth_limit_rule(self): self.assertRaises( @@ -591,14 +609,19 @@ class AdminQosPacketRateLimitRuleTests(QosRulesAPITestCase): self.context = self.project_admin_ctx def test_get_policy_packet_rate_limit_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'get_policy_packet_rate_limit_rule', - self.target)) - self.assertTrue( - policy.enforce(self.context, - 'get_policy_packet_rate_limit_rule', - self.alt_target)) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_packet_rate_limit_rule', + self.target)) + + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_packet_rate_limit_rule', + self.alt_target)) def test_create_policy_packet_rate_limit_rule(self): self.assertTrue( @@ -639,15 +662,20 @@ class ProjectMemberQosPacketRateLimitRuleTests( self.context = self.project_member_ctx def test_get_policy_packet_rate_limit_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'get_policy_packet_rate_limit_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_policy_packet_rate_limit_rule', - self.alt_target) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_packet_rate_limit_rule', + self.target)) + + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_policy_packet_rate_limit_rule', + self.alt_target) def test_create_policy_packet_rate_limit_rule(self): self.assertRaises( @@ -701,28 +729,35 @@ class SystemAdminQosDSCPMarkingRuleTests(QosRulesAPITestCase): self.context = self.system_admin_ctx def test_get_policy_dscp_marking_rule(self): - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_policy_dscp_marking_rule', - self.target) - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_policy_dscp_marking_rule', - self.alt_target) - - # And the same for aliases - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_alias_dscp_marking_rule', - self.target) - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_alias_dscp_marking_rule', - self.alt_target) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_policy_dscp_marking_rule', + self.target) + + # And the same for aliases + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_alias_dscp_marking_rule', + self.target) + + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_policy_dscp_marking_rule', + self.alt_target) + + # And the same for aliases + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_alias_dscp_marking_rule', + self.alt_target) def test_create_policy_dscp_marking_rule(self): self.assertRaises( @@ -806,24 +841,29 @@ class AdminQosDSCPMarkingRuleTests(QosRulesAPITestCase): self.context = self.project_admin_ctx def test_get_policy_dscp_marking_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'get_policy_dscp_marking_rule', - self.target)) - self.assertTrue( - policy.enforce(self.context, - 'get_policy_dscp_marking_rule', - self.alt_target)) - - # And the same for aliases - self.assertTrue( - policy.enforce(self.context, - 'get_alias_dscp_marking_rule', - self.target)) - self.assertTrue( - policy.enforce(self.context, - 'get_alias_dscp_marking_rule', - self.alt_target)) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_dscp_marking_rule', + self.target)) + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'get_alias_dscp_marking_rule', + self.target)) + + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_dscp_marking_rule', + self.alt_target)) + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'get_alias_dscp_marking_rule', + self.alt_target)) def test_create_policy_dscp_marking_rule(self): self.assertTrue( @@ -884,26 +924,31 @@ class ProjectMemberQosDSCPMarkingRuleTests( self.context = self.project_member_ctx def test_get_policy_dscp_marking_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'get_policy_dscp_marking_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_policy_dscp_marking_rule', - self.alt_target) - - # And the same for aliases - self.assertTrue( - policy.enforce(self.context, - 'get_alias_dscp_marking_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_alias_dscp_marking_rule', - self.alt_target) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_dscp_marking_rule', + self.target)) + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'get_alias_dscp_marking_rule', + self.target)) + + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_policy_dscp_marking_rule', + self.alt_target) + # And the same for aliases + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_alias_dscp_marking_rule', + self.alt_target) def test_create_policy_dscp_marking_rule(self): self.assertRaises( @@ -981,28 +1026,33 @@ class SystemAdminQosMinimumBandwidthRuleTests(QosRulesAPITestCase): self.context = self.system_admin_ctx def test_get_policy_minimum_bandwidth_rule(self): - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_policy_minimum_bandwidth_rule', - self.target) - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_policy_minimum_bandwidth_rule', - self.alt_target) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_policy_minimum_bandwidth_rule', + self.target) + # And the same for aliases + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_alias_minimum_bandwidth_rule', + self.target) - # And the same for aliases - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_alias_minimum_bandwidth_rule', - self.target) - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_alias_minimum_bandwidth_rule', - self.alt_target) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_policy_minimum_bandwidth_rule', + self.alt_target) + # And the same for aliases + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_alias_minimum_bandwidth_rule', + self.alt_target) def test_create_policy_minimum_bandwidth_rule(self): self.assertRaises( @@ -1088,24 +1138,29 @@ class AdminQosMinimumBandwidthRuleTests(QosRulesAPITestCase): self.context = self.project_admin_ctx def test_get_policy_minimum_bandwidth_rule(self): - self.assertTrue( - policy.enforce( - self.context, 'get_policy_minimum_bandwidth_rule', - self.target)) - self.assertTrue( - policy.enforce( - self.context, 'get_policy_minimum_bandwidth_rule', - self.alt_target)) - - # And the same for aliases - self.assertTrue( - policy.enforce( - self.context, 'get_alias_minimum_bandwidth_rule', - self.target)) - self.assertTrue( - policy.enforce( - self.context, 'get_alias_minimum_bandwidth_rule', - self.alt_target)) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertTrue( + policy.enforce( + self.context, 'get_policy_minimum_bandwidth_rule', + self.target)) + # And the same for aliases + self.assertTrue( + policy.enforce( + self.context, 'get_alias_minimum_bandwidth_rule', + self.target)) + + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertTrue( + policy.enforce( + self.context, 'get_policy_minimum_bandwidth_rule', + self.alt_target)) + # And the same for aliases + self.assertTrue( + policy.enforce( + self.context, 'get_alias_minimum_bandwidth_rule', + self.alt_target)) def test_create_policy_minimum_bandwidth_rule(self): self.assertTrue( @@ -1166,26 +1221,31 @@ class ProjectMemberQosMinimumBandwidthRuleTests( self.context = self.project_member_ctx def test_get_policy_minimum_bandwidth_rule(self): - self.assertTrue( - policy.enforce( + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertTrue( + policy.enforce( + self.context, 'get_policy_minimum_bandwidth_rule', + self.target)) + # And the same for aliases + self.assertTrue( + policy.enforce( + self.context, 'get_alias_minimum_bandwidth_rule', + self.target)) + + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_policy_minimum_bandwidth_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_policy_minimum_bandwidth_rule', - self.alt_target) - - # And the same for aliases - self.assertTrue( - policy.enforce( + self.alt_target) + # And the same for aliases + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_alias_minimum_bandwidth_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_alias_minimum_bandwidth_rule', - self.alt_target) + self.alt_target) def test_create_policy_minimum_bandwidth_rule(self): self.assertRaises( @@ -1263,28 +1323,33 @@ class SystemAdminQosMinimumPacketRateRuleTests(QosRulesAPITestCase): self.context = self.system_admin_ctx def test_get_policy_minimum_packet_rate_rule(self): - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_policy_minimum_packet_rate_rule', - self.target) - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_policy_minimum_packet_rate_rule', - self.alt_target) - - # And the same for aliases - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_alias_minimum_packet_rate_rule', - self.target) - self.assertRaises( - base_policy.InvalidScope, - policy.enforce, - self.context, 'get_alias_minimum_packet_rate_rule', - self.alt_target) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_policy_minimum_packet_rate_rule', + self.target) + # And the same for aliases + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_alias_minimum_packet_rate_rule', + self.target) + + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_policy_minimum_packet_rate_rule', + self.alt_target) + # And the same for aliases + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_alias_minimum_packet_rate_rule', + self.alt_target) def test_create_policy_minimum_packet_rate_rule(self): self.assertRaises( @@ -1370,24 +1435,29 @@ class AdminQosMinimumPacketRateRuleTests(QosRulesAPITestCase): self.context = self.project_admin_ctx def test_get_policy_minimum_packet_rate_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'get_policy_minimum_packet_rate_rule', - self.target)) - self.assertTrue( - policy.enforce(self.context, - 'get_policy_minimum_packet_rate_rule', - self.alt_target)) - - # And the same for aliases - self.assertTrue( - policy.enforce(self.context, - 'get_alias_minimum_packet_rate_rule', - self.target)) - self.assertTrue( - policy.enforce(self.context, - 'get_alias_minimum_packet_rate_rule', - self.alt_target)) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_minimum_packet_rate_rule', + self.target)) + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'get_alias_minimum_packet_rate_rule', + self.target)) + + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_minimum_packet_rate_rule', + self.alt_target)) + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'get_alias_minimum_packet_rate_rule', + self.alt_target)) def test_create_policy_minimum_packet_rate_rule(self): self.assertTrue( @@ -1438,26 +1508,31 @@ class ProjectMemberQosMinimumPacketRateRuleTests( self.context = self.project_member_ctx def test_get_policy_minimum_packet_rate_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'get_policy_minimum_packet_rate_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_policy_minimum_packet_rate_rule', - self.alt_target) - - # And the same for aliases - self.assertTrue( - policy.enforce(self.context, - 'get_alias_minimum_packet_rate_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_alias_minimum_packet_rate_rule', - self.alt_target) + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.qos_policy): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_minimum_packet_rate_rule', + self.target)) + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'get_alias_minimum_packet_rate_rule', + self.target)) + + with mock.patch.object(self.plugin_mock, "get_policy", + return_value=self.alt_qos_policy): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_policy_minimum_packet_rate_rule', + self.alt_target) + # And the same for aliases + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_alias_minimum_packet_rate_rule', + self.alt_target) def test_create_policy_minimum_packet_rate_rule(self): self.assertRaises( |