summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMohankumar <nmohankumar1011@gmail.com>2016-12-12 17:16:10 +0530
committerMohankumar <nmohankumar1011@gmail.com>2017-07-13 11:03:50 +0000
commite1c98e1b7ad35c1846f0af18d3e9c72ab11eab3e (patch)
treefd7f0b29558f2d4626b756c7ecf40eefd2bbda3f
parent82d10e2499d2195ea54826c9f5ae9b576532a363 (diff)
downloadpython-neutronclient-e1c98e1b7ad35c1846f0af18d3e9c72ab11eab3e.tar.gz
Add SFC cli for OSC plugin
Implements: blueprint openstackclient-cli-porting Change-Id: Ifeb62bad26ffeb0bb8b548c56d2d6a620a922f78
-rw-r--r--doc/source/cli/osc/v2/networking-sfc.rst36
-rw-r--r--neutronclient/osc/v2/sfc/__init__.py0
-rwxr-xr-xneutronclient/osc/v2/sfc/sfc_flow_classifier.py320
-rwxr-xr-xneutronclient/osc/v2/sfc/sfc_port_chain.py347
-rwxr-xr-xneutronclient/osc/v2/sfc/sfc_port_pair.py215
-rwxr-xr-xneutronclient/osc/v2/sfc/sfc_port_pair_group.py291
-rw-r--r--neutronclient/tests/unit/osc/v2/sfc/__init__.py0
-rwxr-xr-xneutronclient/tests/unit/osc/v2/sfc/fakes.py234
-rwxr-xr-xneutronclient/tests/unit/osc/v2/sfc/test_flow_classifier.py378
-rwxr-xr-xneutronclient/tests/unit/osc/v2/sfc/test_port_chain.py556
-rwxr-xr-xneutronclient/tests/unit/osc/v2/sfc/test_port_pair.py302
-rwxr-xr-xneutronclient/tests/unit/osc/v2/sfc/test_port_pair_group.py424
-rw-r--r--neutronclient/v2_0/client.py103
-rw-r--r--releasenotes/notes/add-sfc-commands.yaml5
-rw-r--r--setup.cfg22
15 files changed, 3233 insertions, 0 deletions
diff --git a/doc/source/cli/osc/v2/networking-sfc.rst b/doc/source/cli/osc/v2/networking-sfc.rst
new file mode 100644
index 0000000..09d13d6
--- /dev/null
+++ b/doc/source/cli/osc/v2/networking-sfc.rst
@@ -0,0 +1,36 @@
+==============
+networking sfc
+==============
+
+**Service Function Chaining** is a mechanism for overriding the basic destination based forwarding
+that is typical of IP networks. Service Function Chains consist of an ordered sequence of
+Service Functions (SFs). SFs are virtual machines (or potentially physical devices) that perform a
+network function such as firewall, content cache, packet inspection, or any other function that
+requires processing of packets in a flow from point A to point B even though the SFs are not
+literally between point A and B from a routing table perspective.
+
+Network v2
+
+.. autoprogram-cliff:: openstack.neutronclient.v2
+ :command: sfc flow classifier *
+
+.. autoprogram-cliff:: openstack.neutronclient.v2
+ :command: sfc port chain *
+
+.. autoprogram-cliff:: openstack.neutronclient.v2
+ :command: sfc port pair create
+
+.. autoprogram-cliff:: openstack.neutronclient.v2
+ :command: sfc port pair delete
+
+.. autoprogram-cliff:: openstack.neutronclient.v2
+ :command: sfc port pair list
+
+.. autoprogram-cliff:: openstack.neutronclient.v2
+ :command: sfc port pair set
+
+.. autoprogram-cliff:: openstack.neutronclient.v2
+ :command: sfc port pair show
+
+.. autoprogram-cliff:: openstack.neutronclient.v2
+ :command: sfc port pair group *
diff --git a/neutronclient/osc/v2/sfc/__init__.py b/neutronclient/osc/v2/sfc/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/neutronclient/osc/v2/sfc/__init__.py
diff --git a/neutronclient/osc/v2/sfc/sfc_flow_classifier.py b/neutronclient/osc/v2/sfc/sfc_flow_classifier.py
new file mode 100755
index 0000000..38a3cbb
--- /dev/null
+++ b/neutronclient/osc/v2/sfc/sfc_flow_classifier.py
@@ -0,0 +1,320 @@
+# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import logging
+
+from osc_lib.command import command
+from osc_lib import exceptions
+from osc_lib import utils
+
+from neutronclient._i18n import _
+from neutronclient.common import exceptions as nc_exc
+from neutronclient.osc import utils as nc_osc_utils
+
+LOG = logging.getLogger(__name__)
+
+resource = 'flow_classifier'
+
+_attr_map = (
+ ('id', 'ID', nc_osc_utils.LIST_BOTH),
+ ('name', 'Name', nc_osc_utils.LIST_BOTH),
+ ('summary', 'Summary', nc_osc_utils.LIST_SHORT_ONLY),
+ ('protocol', 'Protocol', nc_osc_utils.LIST_LONG_ONLY),
+ ('ethertype', 'Ethertype', nc_osc_utils.LIST_LONG_ONLY),
+ ('source_ip_prefix', 'Source IP',
+ nc_osc_utils.LIST_LONG_ONLY),
+ ('destination_ip_prefix', 'Destination IP',
+ nc_osc_utils.LIST_LONG_ONLY),
+ ('logical_source_port', 'Logical Source Port',
+ nc_osc_utils.LIST_LONG_ONLY),
+ ('logical_destination_port', 'Logical Destination Port',
+ nc_osc_utils.LIST_LONG_ONLY),
+ ('source_port_range_min', 'Source Port Range Min',
+ nc_osc_utils.LIST_LONG_ONLY),
+ ('source_port_range_max', 'Source Port Range Max',
+ nc_osc_utils.LIST_LONG_ONLY),
+ ('destination_port_range_min', 'Destination Port Range Min',
+ nc_osc_utils.LIST_LONG_ONLY),
+ ('destination_port_range_max', 'Destination Port Range Max',
+ nc_osc_utils.LIST_LONG_ONLY),
+ ('l7_parameters', 'L7 Parameters', nc_osc_utils.LIST_LONG_ONLY),
+ ('description', 'Description', nc_osc_utils.LIST_LONG_ONLY),
+ ('project_id', 'Project', nc_osc_utils.LIST_LONG_ONLY),
+)
+
+
+class CreateSfcFlowClassifier(command.ShowOne):
+ _description = _("Create a flow classifier")
+
+ def get_parser(self, prog_name):
+ parser = super(CreateSfcFlowClassifier, self).get_parser(prog_name)
+ parser.add_argument(
+ 'name',
+ metavar='<name>',
+ help=_('Name of the flow classifier'))
+ parser.add_argument(
+ '--description',
+ metavar='<description>',
+ help=_('Description for the flow classifier'))
+ parser.add_argument(
+ '--protocol',
+ metavar='<protocol>',
+ help=_('IP protocol name. Protocol name should be as per '
+ 'IANA standard.'))
+ parser.add_argument(
+ '--ethertype',
+ metavar='{IPv4,IPv6}',
+ default='IPv4', choices=['IPv4', 'IPv6'],
+ help=_('L2 ethertype, default is IPv4'))
+ parser.add_argument(
+ '--source-port',
+ metavar='<min-port>:<max-port>',
+ help=_('Source protocol port (allowed range [1,65535]. Must be '
+ 'specified as a:b, where a=min-port and b=max-port) '
+ 'in the allowed range.'))
+ parser.add_argument(
+ '--destination-port',
+ metavar='<min-port>:<max-port>',
+ help=_('Destination protocol port (allowed range [1,65535]. Must '
+ 'be specified as a:b, where a=min-port and b=max-port) '
+ 'in the allowed range.'))
+ parser.add_argument(
+ '--source-ip-prefix',
+ metavar='<source-ip-prefix>',
+ help=_('Source IP address in CIDR notation'))
+ parser.add_argument(
+ '--destination-ip-prefix',
+ metavar='<destination-ip-prefix>',
+ help=_('Destination IP address in CIDR notation'))
+ parser.add_argument(
+ '--logical-source-port',
+ metavar='<logical-source-port>',
+ help=_('Neutron source port (name or ID)'))
+ parser.add_argument(
+ '--logical-destination-port',
+ metavar='<logical-destination-port>',
+ help=_('Neutron destination port (name or ID)'))
+ parser.add_argument(
+ '--l7-parameters',
+ help=_('Dictionary of L7 parameters. Currently, no value is '
+ 'supported for this option.'))
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ attrs = _get_common_attrs(self.app.client_manager, parsed_args)
+ body = {resource: attrs}
+ obj = client.create_flow_classifier(body)[resource]
+ columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
+ data = utils.get_dict_properties(obj, columns)
+ return display_columns, data
+
+
+class DeleteSfcFlowClassifier(command.Command):
+ _description = _("Delete a given flow classifier")
+
+ def get_parser(self, prog_name):
+ parser = super(DeleteSfcFlowClassifier, self).get_parser(prog_name)
+ parser.add_argument(
+ 'flow_classifier',
+ metavar='<flow-classifier>',
+ help=_("Flow classifier to delete (name or ID)")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ # TODO(mohan): Add support for deleting multiple resources.
+ client = self.app.client_manager.neutronclient
+ fc_id = _get_id(client, parsed_args.flow_classifier, resource)
+ try:
+ client.delete_flow_classifier(fc_id)
+ except Exception as e:
+ msg = (_("Failed to delete flow classifier with name "
+ "or ID '%(fc)s': %(e)s")
+ % {'fc': parsed_args.flow_classifier, 'e': e})
+ raise exceptions.CommandError(msg)
+
+
+class ListSfcFlowClassifier(command.Lister):
+ _description = _("List flow classifiers")
+
+ def get_parser(self, prog_name):
+ parser = super(ListSfcFlowClassifier, self).get_parser(prog_name)
+ parser.add_argument(
+ '--long',
+ action='store_true',
+ help=_("List additional fields in output")
+ )
+ return parser
+
+ def extend_list(self, data, parsed_args):
+ ext_data = data['flow_classifiers']
+ for d in ext_data:
+ val = []
+ protocol = d['protocol'].upper() if d['protocol'] else 'any'
+ val.append('protocol: ' + protocol)
+ val.append(self._get_protocol_port_details(d, 'source'))
+ val.append(self._get_protocol_port_details(d, 'destination'))
+ if 'logical_source_port' in d:
+ val.append('neutron_source_port: ' +
+ str(d['logical_source_port']))
+
+ if 'logical_destination_port' in d:
+ val.append('neutron_destination_port: ' +
+ str(d['logical_destination_port']))
+
+ if 'l7_parameters' in d:
+ l7_param = 'l7_parameters: {%s}' % ','.join(d['l7_parameters'])
+ val.append(l7_param)
+ d['summary'] = ',\n'.join(val)
+ return ext_data
+
+ def _get_protocol_port_details(self, data, val):
+ type_ip_prefix = val + '_ip_prefix'
+ ip_prefix = data.get(type_ip_prefix)
+ if not ip_prefix:
+ ip_prefix = 'any'
+ min_port = data.get(val + '_port_range_min')
+ if min_port is None:
+ min_port = 'any'
+ max_port = data.get(val + '_port_range_max')
+ if max_port is None:
+ max_port = 'any'
+ return '%s[port]: %s[%s:%s]' % (
+ type, ip_prefix, min_port, max_port)
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ obj = client.list_flow_classifier()
+ obj_extend = self.extend_list(obj, parsed_args)
+ headers, columns = nc_osc_utils.get_column_definitions(
+ _attr_map, long_listing=parsed_args.long)
+ return (headers, (utils.get_dict_properties(
+ s, columns) for s in obj_extend))
+
+
+class SetSfcFlowClassifier(command.Command):
+ _description = _("Set flow classifier properties")
+
+ def get_parser(self, prog_name):
+ parser = super(SetSfcFlowClassifier, self).get_parser(prog_name)
+ parser.add_argument(
+ '--name',
+ metavar='<name>',
+ help=_('Name of the flow classifier'))
+ parser.add_argument(
+ '--description',
+ metavar='<description>',
+ help=_('Description for the flow classifier'))
+ parser.add_argument(
+ 'flow_classifier',
+ metavar='<flow-classifier>',
+ help=_("Flow classifier to modify (name or ID)")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ fc_id = _get_id(client, parsed_args.flow_classifier, resource)
+ attrs = _get_common_attrs(self.app.client_manager, parsed_args,
+ is_create=False)
+ body = {resource: attrs}
+ try:
+ client.update_flow_classifier(fc_id, body)
+ except Exception as e:
+ msg = (_("Failed to update flow classifier '%(fc)s': %(e)s")
+ % {'fc': parsed_args.flow_classifier, 'e': e})
+ raise exceptions.CommandError(msg)
+
+
+class ShowSfcFlowClassifier(command.ShowOne):
+ _description = _("Display flow classifier details")
+
+ def get_parser(self, prog_name):
+ parser = super(ShowSfcFlowClassifier, self).get_parser(prog_name)
+ parser.add_argument(
+ 'flow_classifier',
+ metavar='<flow-classifier>',
+ help=_("Flow classifier to display (name or ID)")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ fc_id = _get_id(client, parsed_args.flow_classifier, resource)
+ obj = client.show_flow_classifier(fc_id)[resource]
+ columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
+ data = utils.get_dict_properties(obj, columns)
+ return display_columns, data
+
+
+def _get_common_attrs(client_manager, parsed_args, is_create=True):
+ attrs = {}
+ if parsed_args.name is not None:
+ attrs['name'] = parsed_args.name
+ if parsed_args.description is not None:
+ attrs['description'] = parsed_args.description
+ if is_create:
+ _get_attrs(client_manager, attrs, parsed_args)
+ return attrs
+
+
+def _get_attrs(client_manager, attrs, parsed_args):
+ if parsed_args.protocol is not None:
+ attrs['protocol'] = parsed_args.protocol
+ if parsed_args.ethertype:
+ attrs['ethertype'] = parsed_args.ethertype
+ if parsed_args.source_ip_prefix is not None:
+ attrs['source_ip_prefix'] = parsed_args.source_ip_prefix
+ if parsed_args.destination_ip_prefix is not None:
+ attrs['destination_ip_prefix'] = parsed_args.destination_ip_prefix
+ if parsed_args.logical_source_port is not None:
+ attrs['logical_source_port'] = _get_id(
+ client_manager.neutronclient, parsed_args.logical_source_port,
+ 'port')
+ if parsed_args.logical_destination_port is not None:
+ attrs['logical_destination_port'] = _get_id(
+ client_manager.neutronclient, parsed_args.logical_destination_port,
+ 'port')
+ if parsed_args.source_port is not None:
+ _fill_protocol_port_info(attrs, 'source',
+ parsed_args.source_port)
+ if parsed_args.destination_port is not None:
+ _fill_protocol_port_info(attrs, 'destination',
+ parsed_args.destination_port)
+ if parsed_args.l7_parameters is not None:
+ attrs['l7_parameters'] = parsed_args.l7_parameters
+
+
+def _fill_protocol_port_info(attrs, port_type, port_val):
+ min_port, sep, max_port = port_val.partition(":")
+ if not min_port:
+ msg = ("Invalid port value '%s', expected format is "
+ "min-port:max-port or min-port.")
+ raise argparse.ArgumentTypeError(msg % port_val)
+ if not max_port:
+ max_port = min_port
+ try:
+ attrs[port_type + '_port_range_min'] = int(min_port)
+ attrs[port_type + '_port_range_max'] = int(max_port)
+ except ValueError:
+ message = (_("Protocol port value %s must be an integer "
+ "or integer:integer.") % port_val)
+ raise nc_exc.CommandError(message=message)
+
+
+def _get_id(client, id_or_name, resource):
+ return client.find_resource(resource, id_or_name)['id']
diff --git a/neutronclient/osc/v2/sfc/sfc_port_chain.py b/neutronclient/osc/v2/sfc/sfc_port_chain.py
new file mode 100755
index 0000000..582a6d9
--- /dev/null
+++ b/neutronclient/osc/v2/sfc/sfc_port_chain.py
@@ -0,0 +1,347 @@
+# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from osc_lib.cli import parseractions
+from osc_lib.command import command
+from osc_lib import exceptions
+from osc_lib import utils
+
+from neutronclient._i18n import _
+from neutronclient.osc import utils as nc_osc_utils
+
+LOG = logging.getLogger(__name__)
+
+resource = 'port_chain'
+
+_attr_map = (
+ ('id', 'ID', nc_osc_utils.LIST_BOTH),
+ ('name', 'Name', nc_osc_utils.LIST_BOTH),
+ ('port_pair_groups', 'Port Pair Groups', nc_osc_utils.LIST_BOTH),
+ ('flow_classifiers', 'Flow Classifiers',
+ nc_osc_utils.LIST_BOTH),
+ ('chain_parameters', 'Chain Parameters',
+ nc_osc_utils.LIST_BOTH),
+ ('description', 'Description', nc_osc_utils.LIST_LONG_ONLY),
+ ('chain_id', 'Chain ID', nc_osc_utils.LIST_BOTH),
+ ('project_id', 'Project', nc_osc_utils.LIST_LONG_ONLY),
+)
+
+
+class CreateSfcPortChain(command.ShowOne):
+ _description = _("Create a port chain")
+
+ def get_parser(self, prog_name):
+ parser = super(CreateSfcPortChain, self).get_parser(prog_name)
+ parser.add_argument(
+ 'name',
+ metavar='<name>',
+ help=_('Name of the port chain'))
+ parser.add_argument(
+ '--description',
+ metavar='<description>',
+ help=_('Description for the port chain'))
+ parser.add_argument(
+ '--flow-classifier',
+ default=[],
+ metavar='<flow-classifier>',
+ dest='flow_classifiers',
+ action='append',
+ help=_('Add flow classifier (name or ID). '
+ 'This option can be repeated.'))
+ parser.add_argument(
+ '--chain-parameters',
+ metavar='correlation=<correlation-type>,symmetric=<boolean>',
+ action=parseractions.MultiKeyValueAction,
+ optional_keys=['correlation', 'symmetric'],
+ help=_('Dictionary of chain parameters. Supports '
+ 'correlation=mpls and symmetric=true|false.'))
+ parser.add_argument(
+ '--port-pair-group',
+ metavar='<port-pair-group>',
+ dest='port_pair_groups',
+ required=True,
+ action='append',
+ help=_('Port pair group (name or ID). '
+ 'This option can be repeated.'))
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ attrs = _get_common_attrs(self.app.client_manager, parsed_args)
+ body = {resource: attrs}
+ obj = client.create_port_chain(body)[resource]
+ columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
+ data = utils.get_dict_properties(obj, columns)
+ return display_columns, data
+
+
+class DeleteSfcPortChain(command.Command):
+ _description = _("Delete a given port chain")
+
+ def get_parser(self, prog_name):
+ parser = super(DeleteSfcPortChain, self).get_parser(prog_name)
+ parser.add_argument(
+ 'port_chain',
+ metavar="<port-chain>",
+ help=_("Port chain to delete (name or ID)")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ # TODO(mohan): Add support for deleting multiple resources.
+ client = self.app.client_manager.neutronclient
+ pc_id = _get_id(client, parsed_args.port_chain, resource)
+ try:
+ client.delete_port_chain(pc_id)
+ except Exception as e:
+ msg = (_("Failed to delete port chain with name "
+ "or ID '%(pc)s': %(e)s")
+ % {'pc': parsed_args.port_chain, 'e': e})
+ raise exceptions.CommandError(msg)
+
+
+class ListSfcPortChain(command.Lister):
+ _description = _("List port chains")
+
+ def get_parser(self, prog_name):
+ parser = super(ListSfcPortChain, self).get_parser(prog_name)
+ parser.add_argument(
+ '--long',
+ action='store_true',
+ default=False,
+ help=_("List additional fields in output")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ data = client.list_port_chain()
+ headers, columns = nc_osc_utils.get_column_definitions(
+ _attr_map, long_listing=parsed_args.long)
+ return (headers,
+ (utils.get_dict_properties(s, columns)
+ for s in data['port_chains']))
+
+
+class SetSfcPortChain(command.Command):
+ _description = _("Set port chain properties")
+
+ def get_parser(self, prog_name):
+ parser = super(SetSfcPortChain, self).get_parser(prog_name)
+ parser.add_argument(
+ '--name',
+ metavar='<name>',
+ help=_('Name of the port chain'))
+ parser.add_argument(
+ '--description',
+ metavar='<description>',
+ help=_('Description for the port chain'))
+ parser.add_argument(
+ '--flow-classifier',
+ metavar='<flow-classifier>',
+ dest='flow_classifiers',
+ action='append',
+ help=_('Add flow classifier (name or ID). '
+ 'This option can be repeated.'))
+ parser.add_argument(
+ '--no-flow-classifier',
+ action='store_true',
+ help=_('Associate no flow classifier with the port chain'))
+ parser.add_argument(
+ '--port-pair-group',
+ metavar='<port-pair-group>',
+ dest='port_pair_groups',
+ action='append',
+ help=_('Add port pair group (name or ID). '
+ 'This option can be repeated.'))
+ parser.add_argument(
+ '--no-port-pair-group',
+ action='store_true',
+ help=_('Remove associated port pair group from the port chain.'
+ 'At least one --port-pair-group must be specified '
+ 'together.'))
+ parser.add_argument(
+ 'port_chain',
+ metavar='<port-chain>',
+ help=_("Port chain to modify (name or ID)"))
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ pc_id = _get_id(client, parsed_args.port_chain, resource)
+ attrs = _get_common_attrs(self.app.client_manager, parsed_args,
+ is_create=False)
+ if parsed_args.no_flow_classifier:
+ attrs['flow_classifiers'] = []
+ if parsed_args.flow_classifiers:
+ for fc in parsed_args.flow_classifiers:
+ added = [client.find_resource(
+ 'flow_classifier', fc,
+ cmd_resource='sfc_flow_classifier')['id']]
+ if parsed_args.no_flow_classifier:
+ existing = []
+ else:
+ existing = [client.find_resource(
+ resource, parsed_args.port_chain,
+ cmd_resource='sfc_port_chain')['flow_classifiers']]
+ attrs['flow_classifiers'] = sorted(list(
+ set(existing) | set(added)))
+ if (parsed_args.no_port_pair_group and not
+ parsed_args.port_pair_groups):
+ message = _('At least one --port-pair-group must be specified.')
+ raise exceptions.CommandError(message)
+ if parsed_args.no_port_pair_group and parsed_args.port_pair_groups:
+ for ppg in parsed_args.port_pair_groups:
+ attrs['port_pair_groups'] = [client.find_resource(
+ 'port_pair_group', ppg,
+ cmd_resource='sfc_port_pair_group')['id']]
+ if (parsed_args.port_pair_groups and
+ not parsed_args.no_port_pair_group):
+ existing_ppg = [client.find_resource(
+ resource, parsed_args.port_chain,
+ cmd_resource='sfc_port_chain')['port_pair_groups']]
+ for ppg in parsed_args.port_pair_groups:
+ existing_ppg.append(client.find_resource(
+ 'port_pair_group', ppg,
+ cmd_resource='sfc_port_pair_group')['id'])
+ attrs['port_pair_groups'] = sorted(list(set(existing_ppg)))
+ body = {resource: attrs}
+ try:
+ client.update_port_chain(pc_id, body)
+ except Exception as e:
+ msg = (_("Failed to update port chain '%(pc)s': %(e)s")
+ % {'pc': parsed_args.port_chain, 'e': e})
+ raise exceptions.CommandError(msg)
+
+
+class ShowSfcPortChain(command.ShowOne):
+ _description = _("Display port chain details")
+
+ def get_parser(self, prog_name):
+ parser = super(ShowSfcPortChain, self).get_parser(prog_name)
+ parser.add_argument(
+ 'port_chain',
+ metavar="<port-chain>",
+ help=_("Port chain to display (name or ID)")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ pc_id = _get_id(client, parsed_args.port_chain, resource)
+ obj = client.show_port_chain(pc_id)[resource]
+ columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
+ data = utils.get_dict_properties(obj, columns)
+ return display_columns, data
+
+
+class UnsetSfcPortChain(command.Command):
+ _description = _("Unset port chain properties")
+
+ def get_parser(self, prog_name):
+ parser = super(UnsetSfcPortChain, self).get_parser(prog_name)
+ parser.add_argument(
+ 'port_chain',
+ metavar='<port-chain>',
+ help=_("Port chain to unset (name or ID)"))
+ port_chain = parser.add_mutually_exclusive_group()
+ port_chain.add_argument(
+ '--flow-classifier',
+ action='append',
+ metavar='<flow-classifier>',
+ dest='flow_classifiers',
+ help=_('Remove flow classifier(s) from the port chain '
+ '(name or ID). This option can be repeated.'))
+ port_chain.add_argument(
+ '--all-flow-classifier',
+ action='store_true',
+ help=_('Remove all flow classifiers from the port chain'))
+ parser.add_argument(
+ '--port-pair-group',
+ metavar='<port-pair-group>',
+ dest='port_pair_groups',
+ action='append',
+ help=_('Remove port pair group(s) from the port chain '
+ '(name or ID). This option can be repeated.'))
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ pc_id = _get_id(client, parsed_args.port_chain, resource)
+ attrs = {}
+ if parsed_args.flow_classifiers:
+ existing = [client.find_resource(
+ resource, parsed_args.port_chain,
+ cmd_resource='sfc_port_chain')['flow_classifiers']]
+ for fc in parsed_args.flow_classifiers:
+ removed = [client.find_resource(
+ 'flow_classifier', fc,
+ cmd_resource='sfc_flow_classifier')['id']]
+ attrs['flow_classifiers'] = list(set(existing) - set(removed))
+ if parsed_args.all_flow_classifier:
+ attrs['flow_classifiers'] = []
+ if parsed_args.port_pair_groups:
+ existing_ppg = [client.find_resource(
+ resource, parsed_args.port_chain,
+ cmd_resource='sfc_port_chain')['port_pair_groups']]
+ for ppg in parsed_args.port_pair_groups:
+ removed_ppg = [client.find_resource(
+ 'port_pair_group', ppg,
+ cmd_resource='sfc_port_pair_group')['id']]
+ attrs['port_pair_groups'] = list(set(existing_ppg) -
+ set(removed_ppg))
+ if attrs['port_pair_groups'] == []:
+ message = _('At least one --port-pair-group must be'
+ ' specified.')
+ raise exceptions.CommandError(message)
+ body = {resource: attrs}
+ try:
+ client.update_port_chain(pc_id, body)
+ except Exception as e:
+ msg = (_("Failed to unset port chain '%(pc)s': %(e)s")
+ % {'pc': parsed_args.port_chain, 'e': e})
+ raise exceptions.CommandError(msg)
+
+
+def _get_common_attrs(client_manager, parsed_args, is_create=True):
+ attrs = {}
+ if parsed_args.name is not None:
+ attrs['name'] = parsed_args.name
+ if parsed_args.description is not None:
+ attrs['description'] = parsed_args.description
+ if ('port_pair_groups' in parsed_args and
+ parsed_args.port_pair_groups is not None):
+ attrs['port_pair_groups'] = [(_get_id(client_manager.neutronclient,
+ ppg, 'port_pair_group'))
+ for ppg in parsed_args.port_pair_groups]
+ if ('flow_classifiers' in parsed_args and
+ parsed_args.flow_classifiers is not None):
+ attrs['flow_classifiers'] = [(_get_id(client_manager.neutronclient, fc,
+ 'flow_classifier'))
+ for fc in parsed_args.flow_classifiers]
+ if is_create is True:
+ _get_attrs(attrs, parsed_args)
+ return attrs
+
+
+def _get_attrs(attrs, parsed_args):
+ if 'chain_parameters' in parsed_args:
+ attrs['chain_parameters'] = parsed_args.chain_parameters
+
+
+def _get_id(client, id_or_name, resource):
+ return client.find_resource(resource, id_or_name)['id']
diff --git a/neutronclient/osc/v2/sfc/sfc_port_pair.py b/neutronclient/osc/v2/sfc/sfc_port_pair.py
new file mode 100755
index 0000000..26f369c
--- /dev/null
+++ b/neutronclient/osc/v2/sfc/sfc_port_pair.py
@@ -0,0 +1,215 @@
+# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from osc_lib.cli import parseractions
+from osc_lib.command import command
+from osc_lib import exceptions
+from osc_lib import utils
+
+from neutronclient._i18n import _
+from neutronclient.osc import utils as nc_osc_utils
+
+LOG = logging.getLogger(__name__)
+
+resource = 'port_pair'
+
+_attr_map = (
+ ('id', 'ID', nc_osc_utils.LIST_BOTH),
+ ('name', 'Name', nc_osc_utils.LIST_BOTH),
+ ('ingress', 'Ingress Logical Port', nc_osc_utils.LIST_BOTH),
+ ('egress', 'Egress Logical Port', nc_osc_utils.LIST_BOTH),
+ ('service_function_parameters', 'Service Function Parameters',
+ nc_osc_utils.LIST_LONG_ONLY),
+ ('description', 'Description', nc_osc_utils.LIST_LONG_ONLY),
+ ('project_id', 'Project', nc_osc_utils.LIST_LONG_ONLY),
+)
+
+
+class CreateSfcPortPair(command.ShowOne):
+ _description = _("Create a port pair")
+
+ def get_parser(self, prog_name):
+ parser = super(CreateSfcPortPair, self).get_parser(prog_name)
+ parser.add_argument(
+ 'name',
+ metavar='<name>',
+ help=_('Name of the port pair'))
+ parser.add_argument(
+ '--description',
+ metavar='<description>',
+ help=_('Description for the port pair'))
+ parser.add_argument(
+ '--service-function-parameters',
+ metavar='correlation=<correlation-type>,weight=<weight>',
+ action=parseractions.MultiKeyValueAction,
+ optional_keys=['correlation', 'weight'],
+ help=_('Dictionary of service function parameters. '
+ 'Currently, only correlation=None and weight '
+ 'is supported. Weight is an integer that influences '
+ 'the selection of a port pair within a port pair group '
+ 'for a flow. The higher the weight, the more flows will '
+ 'hash to the port pair. The default weight is 1.'))
+ parser.add_argument(
+ '--ingress',
+ metavar='<ingress>',
+ required=True,
+ help=_('Ingress neutron port (name or ID)'))
+ parser.add_argument(
+ '--egress',
+ metavar='<egress>',
+ required=True,
+ help=_('Egress neutron port (name or ID)'))
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ attrs = _get_common_attrs(self.app.client_manager, parsed_args)
+ body = {resource: attrs}
+ obj = client.create_port_pair(body)[resource]
+ columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
+ data = utils.get_dict_properties(obj, columns)
+ return display_columns, data
+
+
+class DeleteSfcPortPair(command.Command):
+ _description = _("Delete a given port pair")
+
+ def get_parser(self, prog_name):
+ parser = super(DeleteSfcPortPair, self).get_parser(prog_name)
+ parser.add_argument(
+ 'port_pair',
+ metavar="<port-pair>",
+ help=_("Port pair to delete (name or ID)")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ # TODO(mohan): Add support for deleting multiple resources.
+ client = self.app.client_manager.neutronclient
+ port_pair_id = _get_id(client, parsed_args.port_pair, resource)
+ try:
+ client.delete_port_pair(port_pair_id)
+ except Exception as e:
+ msg = (_("Failed to delete port pair with name "
+ "or ID '%(port_pair)s': %(e)s")
+ % {'port_pair': parsed_args.port_pair, 'e': e})
+ raise exceptions.CommandError(msg)
+
+
+class ListSfcPortPair(command.Lister):
+ _description = _("List port pairs")
+
+ def get_parser(self, prog_name):
+ parser = super(ListSfcPortPair, self).get_parser(prog_name)
+ parser.add_argument(
+ '--long',
+ action='store_true',
+ help=_("List additional fields in output")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ data = client.list_port_pair()
+ headers, columns = nc_osc_utils.get_column_definitions(
+ _attr_map, long_listing=parsed_args.long)
+ return (headers,
+ (utils.get_dict_properties(
+ s, columns,
+ ) for s in data['port_pairs']))
+
+
+class SetSfcPortPair(command.Command):
+ _description = _("Set port pair properties")
+
+ def get_parser(self, prog_name):
+ parser = super(SetSfcPortPair, self).get_parser(prog_name)
+ parser.add_argument(
+ '--name',
+ metavar='<name>',
+ help=_('Name of the port pair'))
+ parser.add_argument(
+ '--description',
+ metavar='<description>',
+ help=_('Description for the port pair'))
+ parser.add_argument(
+ 'port_pair',
+ metavar='<port-pair>',
+ help=_("Port pair to modify (name or ID)")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ port_pair_id = _get_id(client, parsed_args.port_pair, resource)
+ attrs = _get_common_attrs(self.app.client_manager, parsed_args,
+ is_create=False)
+ body = {resource: attrs}
+ try:
+ client.update_port_pair(port_pair_id, body)
+ except Exception as e:
+ msg = (_("Failed to update port pair '%(port_pair)s': %(e)s")
+ % {'port_pair': parsed_args.port_pair, 'e': e})
+ raise exceptions.CommandError(msg)
+
+
+class ShowSfcPortPair(command.ShowOne):
+ _description = _("Display port pair details")
+
+ def get_parser(self, prog_name):
+ parser = super(ShowSfcPortPair, self).get_parser(prog_name)
+ parser.add_argument(
+ 'port_pair',
+ metavar='<port-pair>',
+ help=_("Port pair to display (name or ID)")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ port_pair_id = _get_id(client, parsed_args.port_pair, resource)
+ obj = client.show_port_pair(port_pair_id)[resource]
+ columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
+ data = utils.get_dict_properties(obj, columns)
+ return display_columns, data
+
+
+def _get_common_attrs(client_manager, parsed_args, is_create=True):
+ attrs = {}
+ if parsed_args.name is not None:
+ attrs['name'] = parsed_args.name
+ if parsed_args.description is not None:
+ attrs['description'] = parsed_args.description
+ if is_create:
+ _get_attrs(client_manager, attrs, parsed_args)
+ return attrs
+
+
+def _get_attrs(client_manager, attrs, parsed_args):
+ if parsed_args.ingress is not None:
+ attrs['ingress'] = _get_id(client_manager.neutronclient,
+ parsed_args.ingress, 'port')
+ if parsed_args.egress is not None:
+ attrs['egress'] = _get_id(client_manager.neutronclient,
+ parsed_args.egress, 'port')
+ if 'service_function_parameters' in parsed_args:
+ attrs['service_function_parameters'] = (
+ parsed_args.service_function_parameters)
+
+
+def _get_id(client, id_or_name, resource):
+ return client.find_resource(resource, id_or_name)['id']
diff --git a/neutronclient/osc/v2/sfc/sfc_port_pair_group.py b/neutronclient/osc/v2/sfc/sfc_port_pair_group.py
new file mode 100755
index 0000000..367dab1
--- /dev/null
+++ b/neutronclient/osc/v2/sfc/sfc_port_pair_group.py
@@ -0,0 +1,291 @@
+# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from osc_lib.cli import parseractions
+from osc_lib.command import command
+from osc_lib import exceptions
+from osc_lib import utils
+
+from neutronclient._i18n import _
+from neutronclient.osc import utils as nc_osc_utils
+
+LOG = logging.getLogger(__name__)
+
+resource = 'port_pair_group'
+
+_attr_map = (
+ ('id', 'ID', nc_osc_utils.LIST_BOTH),
+ ('name', 'Name', nc_osc_utils.LIST_BOTH),
+ ('port_pairs', 'Port Pair', nc_osc_utils.LIST_BOTH),
+ ('port_pair_group_parameters', 'Port Pair Group Parameters',
+ nc_osc_utils.LIST_BOTH),
+ ('description', 'Description', nc_osc_utils.LIST_LONG_ONLY),
+ ('group_id', 'Loadbalance ID', nc_osc_utils.LIST_LONG_ONLY),
+ ('project_id', 'Project', nc_osc_utils.LIST_LONG_ONLY),
+)
+
+
+class CreateSfcPortPairGroup(command.ShowOne):
+ _description = _("Create a port pair group")
+
+ def get_parser(self, prog_name):
+ parser = super(CreateSfcPortPairGroup, self).get_parser(prog_name)
+ parser.add_argument(
+ 'name',
+ metavar='<name>',
+ help=_('Name of the port pair group'))
+ parser.add_argument(
+ '--description',
+ metavar='<description>',
+ help=_('Description for the port pair group'))
+ parser.add_argument(
+ '--port-pair',
+ metavar='<port-pair>',
+ dest='port_pairs',
+ default=[],
+ action='append',
+ help=_('Port pair (name or ID). '
+ 'This option can be repeated.'))
+ parser.add_argument(
+ '--port-pair-group-parameters',
+ metavar='lb-fields=<lb-fields>',
+ action=parseractions.KeyValueAction,
+ help=_('Dictionary of port pair group parameters. '
+ 'Currently only one parameter lb-fields is supported. '
+ '<lb-fields> is a & separated list of load-balancing '
+ 'fields.'))
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ attrs = _get_common_attrs(self.app.client_manager, parsed_args)
+ body = {resource: attrs}
+ obj = client.create_port_pair_group(body)[resource]
+ columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
+ data = utils.get_dict_properties(obj, columns)
+ return display_columns, data
+
+
+class DeleteSfcPortPairGroup(command.Command):
+ _description = _("Delete a given port pair group")
+
+ def get_parser(self, prog_name):
+ parser = super(DeleteSfcPortPairGroup, self).get_parser(prog_name)
+ parser.add_argument(
+ 'port_pair_group',
+ metavar='<port-pair-group>',
+ help=_("Port pair group to delete (name or ID)")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ # TODO(mohan): Add support for deleting multiple resources.
+ client = self.app.client_manager.neutronclient
+ ppg_id = _get_id(client, parsed_args.port_pair_group, resource)
+ try:
+ client.delete_port_pair_group(ppg_id)
+ except Exception as e:
+ msg = (_("Failed to delete port pair group with name "
+ "or ID '%(ppg)s': %(e)s")
+ % {'ppg': parsed_args.port_pair_group, 'e': e})
+ raise exceptions.CommandError(msg)
+
+
+class ListSfcPortPairGroup(command.Lister):
+ _description = _("List port pair group")
+
+ def get_parser(self, prog_name):
+ parser = super(ListSfcPortPairGroup, self).get_parser(prog_name)
+ parser.add_argument(
+ '--long',
+ action='store_true',
+ default=False,
+ help=_("List additional fields in output")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ data = client.list_port_pair_group()
+ headers, columns = nc_osc_utils.get_column_definitions(
+ _attr_map, long_listing=parsed_args.long)
+ return (headers,
+ (utils.get_dict_properties(
+ s, columns,
+ ) for s in data['port_pair_groups']))
+
+
+class SetSfcPortPairGroup(command.Command):
+ _description = _("Set port pair group properties")
+
+ def get_parser(self, prog_name):
+ parser = super(SetSfcPortPairGroup, self).get_parser(prog_name)
+ parser.add_argument(
+ 'port_pair_group',
+ metavar='<port-pair-group>',
+ help=_("Port pair group to modify (name or ID)"))
+ parser.add_argument(
+ '--name',
+ metavar='<name>',
+ help=_('Name of the port pair group'))
+ parser.add_argument(
+ '--description',
+ metavar='<description>',
+ help=_('Description for the port pair group'))
+ parser.add_argument(
+ '--port-pair',
+ metavar='<port-pair>',
+ dest='port_pairs',
+ default=[],
+ action='append',
+ help=_('Port pair (name or ID). '
+ 'This option can be repeated.'))
+ parser.add_argument(
+ '--no-port-pair',
+ action='store_true',
+ help=_('Remove all port pair from port pair group'))
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ ppg_id = _get_id(client, parsed_args.port_pair_group, resource)
+ attrs = _get_common_attrs(self.app.client_manager, parsed_args,
+ is_create=False)
+ if parsed_args.no_port_pair:
+ attrs['port_pairs'] = []
+ if parsed_args.port_pairs:
+ added = [client.find_resource('port_pair', pp,
+ cmd_resource='sfc_port_pair')['id']
+ for pp in parsed_args.port_pairs]
+ if parsed_args.no_port_pair:
+ existing = []
+ else:
+ existing = [client.find_resource(
+ resource, parsed_args.port_pair_group,
+ cmd_resource='sfc_port_pair_group')['port_pairs']]
+ attrs['port_pairs'] = sorted(list(set(existing) | set(added)))
+ body = {resource: attrs}
+ try:
+ client.update_port_pair_group(ppg_id, body)
+ except Exception as e:
+ msg = (_("Failed to update port pair group '%(ppg)s': %(e)s")
+ % {'ppg': parsed_args.port_pair_group, 'e': e})
+ raise exceptions.CommandError(msg)
+
+
+class ShowSfcPortPairGroup(command.ShowOne):
+ _description = _("Display port pair group details")
+
+ def get_parser(self, prog_name):
+ parser = super(ShowSfcPortPairGroup, self).get_parser(prog_name)
+ parser.add_argument(
+ 'port_pair_group',
+ metavar='<port-pair-group>',
+ help=_("Port pair group to display (name or ID)")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ ppg_id = _get_id(client, parsed_args.port_pair_group, resource)
+ obj = client.show_port_pair_group(ppg_id)[resource]
+ columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
+ data = utils.get_dict_properties(obj, columns)
+ return display_columns, data
+
+
+class UnsetSfcPortPairGroup(command.Command):
+ _description = _("Unset port pairs from port pair group")
+
+ def get_parser(self, prog_name):
+ parser = super(UnsetSfcPortPairGroup, self).get_parser(prog_name)
+ parser.add_argument(
+ 'port_pair_group',
+ metavar='<port-pair-group>',
+ help=_("Port pair group to unset (name or ID)"))
+ port_pair_group = parser.add_mutually_exclusive_group()
+ port_pair_group.add_argument(
+ '--port-pair',
+ action='append',
+ metavar='<port-pair>',
+ dest='port_pairs',
+ help=_('Remove port pair(s) from the port pair group '
+ '(name or ID). This option can be repeated.'))
+ port_pair_group.add_argument(
+ '--all-port-pair',
+ action='store_true',
+ help=_('Remove all port pairs from the port pair group'))
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.neutronclient
+ ppg_id = _get_id(client, parsed_args.port_pair_group, resource)
+ attrs = {}
+ if parsed_args.port_pairs:
+ existing = [client.find_resource(
+ resource, parsed_args.port_pair_group,
+ cmd_resource='sfc_port_pair_group')['port_pairs']]
+ for pp in parsed_args.port_pairs:
+ removed = [client.find_resource(
+ 'port_pair', pp, cmd_resource='sfc_port_pair')['id']]
+ attrs['port_pairs'] = list(set(existing) - set(removed))
+ if parsed_args.all_port_pair:
+ attrs['port_pairs'] = []
+ body = {resource: attrs}
+ try:
+ client.update_port_pair_group(ppg_id, body)
+ except Exception as e:
+ msg = (_("Failed to unset port pair group '%(ppg)s': %(e)s")
+ % {'ppg': parsed_args.port_pair_group, 'e': e})
+ raise exceptions.CommandError(msg)
+
+
+def _get_ppg_param(attrs, ppg):
+ attrs['port_pair_group_parameters'] = {}
+ for key, value in ppg.items():
+ if key == 'lb_fields':
+ attrs['port_pair_group_parameters'][key] = ([
+ field for field in value.split('&') if field])
+ else:
+ attrs['port_pair_group_parameters'][key] = value
+ return attrs['port_pair_group_parameters']
+
+
+def _get_common_attrs(client_manager, parsed_args, is_create=True):
+ attrs = {}
+ if parsed_args.name is not None:
+ attrs['name'] = parsed_args.name
+ if parsed_args.description is not None:
+ attrs['description'] = parsed_args.description
+ if parsed_args.port_pairs:
+ attrs['port_pairs'] = [(_get_id(client_manager.neutronclient, pp,
+ 'port_pair'))
+ for pp in parsed_args.port_pairs]
+ if is_create:
+ _get_attrs(attrs, parsed_args)
+ return attrs
+
+
+def _get_attrs(attrs, parsed_args):
+ if ('port_pair_group_parameters' in parsed_args and
+ parsed_args.port_pair_group_parameters is not None):
+ attrs['port_pair_group_parameters'] = (
+ _get_ppg_param(attrs, parsed_args.port_pair_group_parameters))
+
+
+def _get_id(client, id_or_name, resource):
+ return client.find_resource(resource, id_or_name)['id']
diff --git a/neutronclient/tests/unit/osc/v2/sfc/__init__.py b/neutronclient/tests/unit/osc/v2/sfc/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/neutronclient/tests/unit/osc/v2/sfc/__init__.py
diff --git a/neutronclient/tests/unit/osc/v2/sfc/fakes.py b/neutronclient/tests/unit/osc/v2/sfc/fakes.py
new file mode 100755
index 0000000..54bd5a8
--- /dev/null
+++ b/neutronclient/tests/unit/osc/v2/sfc/fakes.py
@@ -0,0 +1,234 @@
+# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import copy
+
+import mock
+
+from osc_lib.tests import utils
+from oslo_utils import uuidutils
+
+
+class TestNeutronClientOSCV2(utils.TestCommand):
+
+ def setUp(self):
+ super(TestNeutronClientOSCV2, self).setUp()
+ self.namespace = argparse.Namespace()
+ self.app.client_manager.session = mock.Mock()
+ self.app.client_manager.neutronclient = mock.Mock()
+ self.neutronclient = self.app.client_manager.neutronclient
+ self.neutronclient.find_resource = mock.Mock(
+ side_effect=lambda resource, name_or_id, project_id=None,
+ cmd_resource=None, parent_id=None, fields=None:
+ {'id': name_or_id})
+
+
+class FakeSfcPortPair(object):
+ """Fake port pair attributes."""
+
+ @staticmethod
+ def create_port_pair(attrs=None):
+ """Create a fake port pair.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A Dictionary with id, name, description, ingress, egress,
+ service-function-parameter, project_id
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ port_pair_attrs = {
+ 'description': 'description',
+ 'egress': uuidutils.generate_uuid(),
+ 'id': uuidutils.generate_uuid(),
+ 'ingress': uuidutils.generate_uuid(),
+ 'name': 'port-pair-name',
+ 'service_function_parameters': 'correlation=None,weight=1',
+ 'project_id': uuidutils.generate_uuid(),
+ }
+
+ # Overwrite default attributes.
+ port_pair_attrs.update(attrs)
+ return copy.deepcopy(port_pair_attrs)
+
+ @staticmethod
+ def create_port_pairs(attrs=None, count=1):
+ """Create multiple port_pairs.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of port_pairs to fake
+ :return:
+ A list of dictionaries faking the port_pairs
+ """
+ port_pairs = []
+ for _ in range(count):
+ port_pairs.append(FakeSfcPortPair.create_port_pair(attrs))
+
+ return port_pairs
+
+
+class FakeSfcPortPairGroup(object):
+ """Fake port pair group attributes."""
+
+ @staticmethod
+ def create_port_pair_group(attrs=None):
+ """Create a fake port pair group.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A Dictionary with id, name, description, port_pairs, group_id
+ port_pair_group_parameters, project_id
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ port_pair_group_attrs = {
+ 'id': uuidutils.generate_uuid(),
+ 'group_id': uuidutils.generate_uuid(),
+ 'name': 'port-pair-group-name',
+ 'description': 'description',
+ 'port_pairs': uuidutils.generate_uuid(),
+ 'port_pair_group_parameters': '{"lb_fields": []}',
+ 'project_id': uuidutils.generate_uuid()
+ }
+
+ # port_pair_group_attrs default attributes.
+ port_pair_group_attrs.update(attrs)
+ return copy.deepcopy(port_pair_group_attrs)
+
+ @staticmethod
+ def create_port_pair_groups(attrs=None, count=1):
+ """Create multiple port pair groups.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of port_pair_groups to fake
+ :return:
+ A list of dictionaries faking the port pair groups
+ """
+ port_pair_groups = []
+ for _ in range(count):
+ port_pair_groups.append(
+ FakeSfcPortPairGroup.create_port_pair_group(attrs))
+
+ return port_pair_groups
+
+
+class FakeSfcFlowClassifier(object):
+ """Fake flow classifier attributes."""
+
+ @staticmethod
+ def create_flow_classifier(attrs=None):
+ """Create a fake flow classifier.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A Dictionary with faking port chain attributes
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ flow_classifier_attrs = {
+ 'id': uuidutils.generate_uuid(),
+ 'destination_ip_prefix': '2.2.2.2/32',
+ 'destination_port_range_max': '90',
+ 'destination_port_range_min': '80',
+ 'ethertype': 'IPv4',
+ 'logical_destination_port': uuidutils.generate_uuid(),
+ 'logical_source_port': uuidutils.generate_uuid(),
+ 'name': 'flow-classifier-name',
+ 'description': 'fc_description',
+ 'protocol': 'tcp',
+ 'source_ip_prefix': '1.1.1.1/32',
+ 'source_port_range_max': '20',
+ 'source_port_range_min': '10',
+ 'project_id': uuidutils.generate_uuid(),
+ 'l7_parameters': '{}'
+ }
+ flow_classifier_attrs.update(attrs)
+ return copy.deepcopy(flow_classifier_attrs)
+
+ @staticmethod
+ def create_flow_classifiers(attrs=None, count=1):
+ """Create multiple flow classifiers.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of flow classifiers to fake
+ :return:
+ A list of dictionaries faking the flow classifiers
+ """
+ flow_classifiers = []
+ for _ in range(count):
+ flow_classifiers.append(
+ FakeSfcFlowClassifier.create_flow_classifier(attrs))
+
+ return flow_classifiers
+
+
+class FakeSfcPortChain(object):
+ """Fake port chain attributes."""
+
+ @staticmethod
+ def create_port_chain(attrs=None):
+ """Create a fake port chain.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A Dictionary with faking port chain attributes
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ port_chain_attrs = {
+ 'id': uuidutils.generate_uuid(),
+ 'chain_id': uuidutils.generate_uuid(),
+ 'name': 'port-chain-name',
+ 'description': 'description',
+ 'port_pair_groups': uuidutils.generate_uuid(),
+ 'flow_classifiers': uuidutils.generate_uuid(),
+ 'chain_parameters': '{"correlation": mpls}',
+ 'project_id': uuidutils.generate_uuid(),
+ }
+
+ # port_pair_group_attrs default attributes.
+ port_chain_attrs.update(attrs)
+ return copy.deepcopy(port_chain_attrs)
+
+ @staticmethod
+ def create_port_chains(attrs=None, count=1):
+ """Create multiple port chains.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of port chains to fake
+ :return:
+ A list of dictionaries faking the port chains.
+ """
+ port_chains = []
+ for _ in range(count):
+ port_chains.append(FakeSfcPortChain.create_port_chain(attrs))
+ return port_chains
diff --git a/neutronclient/tests/unit/osc/v2/sfc/test_flow_classifier.py b/neutronclient/tests/unit/osc/v2/sfc/test_flow_classifier.py
new file mode 100755
index 0000000..fff11b3
--- /dev/null
+++ b/neutronclient/tests/unit/osc/v2/sfc/test_flow_classifier.py
@@ -0,0 +1,378 @@
+# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutronclient.osc.v2.sfc import sfc_flow_classifier
+from neutronclient.tests.unit.osc.v2.sfc import fakes
+
+get_id = 'neutronclient.osc.v2.sfc.sfc_flow_classifier._get_id'
+
+
+def _get_id(client, id_or_name, resource):
+ return id_or_name
+
+
+class TestCreateSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
+
+ _fc = fakes.FakeSfcFlowClassifier.create_flow_classifier()
+
+ columns = ('Description',
+ 'Destination IP',
+ 'Destination Port Range Max',
+ 'Destination Port Range Min',
+ 'Ethertype',
+ 'ID',
+ 'L7 Parameters',
+ 'Logical Destination Port',
+ 'Logical Source Port',
+ 'Name',
+ 'Project',
+ 'Protocol',
+ 'Source IP',
+ 'Source Port Range Max',
+ 'Source Port Range Min')
+
+ def get_data(self):
+ return (
+ self._fc['description'],
+ self._fc['destination_ip_prefix'],
+ self._fc['destination_port_range_max'],
+ self._fc['destination_port_range_min'],
+ self._fc['ethertype'],
+ self._fc['id'],
+ self._fc['l7_parameters'],
+ self._fc['logical_destination_port'],
+ self._fc['logical_source_port'],
+ self._fc['name'],
+ self._fc['project_id'],
+ self._fc['protocol'],
+ self._fc['source_ip_prefix'],
+ self._fc['source_port_range_max'],
+ self._fc['source_port_range_min']
+ )
+
+ def setUp(self):
+ super(TestCreateSfcFlowClassifier, self).setUp()
+ mock.patch(get_id, new=_get_id).start()
+ self.neutronclient.create_flow_classifier = mock.Mock(
+ return_value={'flow_classifier': self._fc})
+ self.data = self.get_data()
+
+ # Get the command object to test
+ self.cmd = sfc_flow_classifier.CreateSfcFlowClassifier(self.app,
+ self.namespace)
+
+ def test_create_flow_classifier_default_options(self):
+ arglist = [
+ "--logical-source-port", self._fc['logical_source_port'],
+ "--ethertype", self._fc['ethertype'],
+ self._fc['name'],
+ ]
+ verifylist = [
+ ('logical_source_port', self._fc['logical_source_port']),
+ ('ethertype', self._fc['ethertype']),
+ ('name', self._fc['name']),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.neutronclient.create_flow_classifier.assert_called_once_with({
+ 'flow_classifier': {
+ 'name': self._fc['name'],
+ 'logical_source_port': self._fc['logical_source_port'],
+ 'ethertype': self._fc['ethertype']}
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_flow_classifier(self):
+ arglist = [
+ "--description", self._fc['description'],
+ "--ethertype", self._fc['ethertype'],
+ "--protocol", self._fc['protocol'],
+ "--source-ip-prefix", self._fc['source_ip_prefix'],
+ "--destination-ip-prefix", self._fc['destination_ip_prefix'],
+ "--logical-source-port", self._fc['logical_source_port'],
+ "--logical-destination-port", self._fc['logical_destination_port'],
+ self._fc['name'],
+ "--l7-parameters", 'url=path',
+ ]
+
+ param = 'url=path'
+
+ verifylist = [
+ ('description', self._fc['description']),
+ ('name', self._fc['name']),
+ ('ethertype', self._fc['ethertype']),
+ ('protocol', self._fc['protocol']),
+ ('source_ip_prefix', self._fc['source_ip_prefix']),
+ ('destination_ip_prefix', self._fc['destination_ip_prefix']),
+ ('logical_source_port', self._fc['logical_source_port']),
+ ('logical_destination_port',
+ self._fc['logical_destination_port']),
+ ('l7_parameters', param)
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+ self.neutronclient.create_flow_classifier.assert_called_once_with({
+ 'flow_classifier': {
+ 'name': self._fc['name'],
+ 'description': self._fc['description'],
+ 'ethertype': self._fc['ethertype'],
+ 'protocol': self._fc['protocol'],
+ 'source_ip_prefix': self._fc['source_ip_prefix'],
+ 'destination_ip_prefix': self._fc['destination_ip_prefix'],
+ 'logical_source_port': self._fc['logical_source_port'],
+ 'logical_destination_port':
+ self._fc['logical_destination_port'],
+ 'l7_parameters': param
+ }
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
+
+ _flow_classifier = \
+ fakes.FakeSfcFlowClassifier.create_flow_classifiers(count=1)
+
+ def setUp(self):
+ super(TestDeleteSfcFlowClassifier, self).setUp()
+ mock.patch(get_id, new=_get_id).start()
+ self.neutronclient.delete_flow_classifier = mock.Mock(
+ return_value=None)
+ self.cmd = sfc_flow_classifier.DeleteSfcFlowClassifier(self.app,
+ self.namespace)
+
+ def test_delete_flow_classifier(self):
+ client = self.app.client_manager.neutronclient
+ mock_flow_classifier_delete = client.delete_flow_classifier
+ arglist = [
+ self._flow_classifier[0]['id'],
+ ]
+ verifylist = [
+ ('flow_classifier', self._flow_classifier[0]['id']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ mock_flow_classifier_delete.assert_called_once_with(
+ self._flow_classifier[0]['id'])
+ self.assertIsNone(result)
+
+
+class TestSetSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
+ _flow_classifier = fakes.FakeSfcFlowClassifier.create_flow_classifier()
+ _flow_classifier_name = _flow_classifier['name']
+ _flow_classifier_id = _flow_classifier['id']
+
+ def setUp(self):
+ super(TestSetSfcFlowClassifier, self).setUp()
+ mock.patch(get_id, new=_get_id).start()
+ self.neutronclient.update_flow_classifier = mock.Mock(
+ return_value=None)
+ self.cmd = sfc_flow_classifier.SetSfcFlowClassifier(self.app,
+ self.namespace)
+
+ def test_set_flow_classifier(self):
+ client = self.app.client_manager.neutronclient
+ mock_flow_classifier_update = client.update_flow_classifier
+ arglist = [
+ self._flow_classifier_name,
+ '--name', 'name_updated',
+ '--description', 'desc_updated'
+ ]
+ verifylist = [
+ ('flow_classifier', self._flow_classifier_name),
+ ('name', 'name_updated'),
+ ('description', 'desc_updated'),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {'flow_classifier': {
+ 'name': 'name_updated',
+ 'description': 'desc_updated'}}
+ mock_flow_classifier_update.assert_called_once_with(
+ self._flow_classifier_name, attrs)
+ self.assertIsNone(result)
+
+
+class TestShowSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
+
+ _fc = fakes.FakeSfcFlowClassifier.create_flow_classifier()
+ data = (
+ _fc['description'],
+ _fc['destination_ip_prefix'],
+ _fc['destination_port_range_max'],
+ _fc['destination_port_range_min'],
+ _fc['ethertype'],
+ _fc['id'],
+ _fc['l7_parameters'],
+ _fc['logical_destination_port'],
+ _fc['logical_source_port'],
+ _fc['name'],
+ _fc['project_id'],
+ _fc['protocol'],
+ _fc['source_ip_prefix'],
+ _fc['source_port_range_max'],
+ _fc['source_port_range_min']
+ )
+ _flow_classifier = {'flow_classifier': _fc}
+ _flow_classifier_id = _fc['id']
+ columns = ('Description',
+ 'Destination IP',
+ 'Destination Port Range Max',
+ 'Destination Port Range Min',
+ 'Ethertype',
+ 'ID',
+ 'L7 Parameters',
+ 'Logical Destination Port',
+ 'Logical Source Port',
+ 'Name',
+ 'Project',
+ 'Protocol',
+ 'Source IP',
+ 'Source Port Range Max',
+ 'Source Port Range Min')
+
+ def setUp(self):
+ super(TestShowSfcFlowClassifier, self).setUp()
+ mock.patch(get_id, new=_get_id).start()
+ self.neutronclient.show_flow_classifier = mock.Mock(
+ return_value=self._flow_classifier
+ )
+ # Get the command object to test
+ self.cmd = sfc_flow_classifier.ShowSfcFlowClassifier(self.app,
+ self.namespace)
+
+ def test_show_flow_classifier(self):
+ client = self.app.client_manager.neutronclient
+ mock_flow_classifier_show = client.show_flow_classifier
+ arglist = [
+ self._flow_classifier_id,
+ ]
+ verifylist = [
+ ('flow_classifier', self._flow_classifier_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ mock_flow_classifier_show.assert_called_once_with(
+ self._flow_classifier_id)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestListSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
+
+ _fc = fakes.FakeSfcFlowClassifier.create_flow_classifiers(count=1)
+
+ columns = ('ID', 'Name', 'Summary')
+
+ columns_long = ('ID', 'Name', 'Protocol', 'Ethertype', 'Source IP',
+ 'Destination IP', 'Logical Source Port',
+ 'Logical Destination Port', 'Source Port Range Min',
+ 'Source Port Range Max', 'Destination Port Range Min',
+ 'Destination Port Range Max', 'L7 Parameters',
+ 'Description', 'Project')
+ _flow_classifier = _fc[0]
+ data = [
+ _flow_classifier['id'],
+ _flow_classifier['name'],
+ _flow_classifier['protocol'],
+ _flow_classifier['source_ip_prefix'],
+ _flow_classifier['destination_ip_prefix'],
+ _flow_classifier['logical_source_port'],
+ _flow_classifier['logical_destination_port']
+ ]
+ data_long = [
+ _flow_classifier['id'],
+ _flow_classifier['name'],
+ _flow_classifier['protocol'],
+ _flow_classifier['ethertype'],
+ _flow_classifier['source_ip_prefix'],
+ _flow_classifier['destination_ip_prefix'],
+ _flow_classifier['logical_source_port'],
+ _flow_classifier['logical_destination_port'],
+ _flow_classifier['source_port_range_min'],
+ _flow_classifier['source_port_range_max'],
+ _flow_classifier['destination_port_range_min'],
+ _flow_classifier['destination_port_range_max'],
+ _flow_classifier['l7_parameters'],
+ _flow_classifier['description']
+ ]
+
+ _flow_classifier1 = {'flow_classifiers': _flow_classifier}
+ _flow_classifier_id = _flow_classifier['id']
+
+ def setUp(self):
+ super(TestListSfcFlowClassifier, self).setUp()
+ mock.patch(get_id, new=_get_id).start()
+ self.neutronclient.list_flow_classifier = mock.Mock(
+ return_value={'flow_classifiers': self._fc}
+ )
+ # Get the command object to test
+ self.cmd = sfc_flow_classifier.ListSfcFlowClassifier(self.app,
+ self.namespace)
+
+ def test_list_flow_classifier(self):
+ arglist = []
+ verifylist = []
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns = self.cmd.take_action(parsed_args)
+ fcs = self.neutronclient.list_flow_classifier()['flow_classifiers']
+ fc = fcs[0]
+ data = [
+ fc['id'],
+ fc['name'],
+ fc['protocol'],
+ fc['source_ip_prefix'],
+ fc['destination_ip_prefix'],
+ fc['logical_source_port'],
+ fc['logical_destination_port']
+ ]
+ self.assertEqual(list(self.columns), columns[0])
+ self.assertEqual(self.data, data)
+
+ def test_list_with_long_option(self):
+ arglist = ['--long']
+ verifylist = [('long', True)]
+ fcs = self.neutronclient.list_flow_classifier()['flow_classifiers']
+ fc = fcs[0]
+ data = [
+ fc['id'],
+ fc['name'],
+ fc['protocol'],
+ fc['ethertype'],
+ fc['source_ip_prefix'],
+ fc['destination_ip_prefix'],
+ fc['logical_source_port'],
+ fc['logical_destination_port'],
+ fc['source_port_range_min'],
+ fc['source_port_range_max'],
+ fc['destination_port_range_min'],
+ fc['destination_port_range_max'],
+ fc['l7_parameters'],
+ fc['description']
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns_long = self.cmd.take_action(parsed_args)[0]
+ self.assertEqual(list(self.columns_long), columns_long)
+ self.assertEqual(self.data_long, data)
diff --git a/neutronclient/tests/unit/osc/v2/sfc/test_port_chain.py b/neutronclient/tests/unit/osc/v2/sfc/test_port_chain.py
new file mode 100755
index 0000000..8cb0fe2
--- /dev/null
+++ b/neutronclient/tests/unit/osc/v2/sfc/test_port_chain.py
@@ -0,0 +1,556 @@
+# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from osc_lib import exceptions
+
+from neutronclient.osc.v2.sfc import sfc_port_chain
+from neutronclient.tests.unit.osc.v2.sfc import fakes
+
+
+def _get_id(client, id_or_name, resource):
+ return id_or_name
+
+
+class TestCreateSfcPortChain(fakes.TestNeutronClientOSCV2):
+ # The new port_chain created
+ _port_chain = fakes.FakeSfcPortChain.create_port_chain()
+
+ columns = ('Chain ID',
+ 'Chain Parameters',
+ 'Description',
+ 'Flow Classifiers',
+ 'ID',
+ 'Name',
+ 'Port Pair Groups',
+ 'Project')
+
+ def get_data(self):
+ return (
+ self._port_chain['chain_id'],
+ self._port_chain['chain_parameters'],
+ self._port_chain['description'],
+ self._port_chain['flow_classifiers'],
+ self._port_chain['id'],
+ self._port_chain['name'],
+ self._port_chain['port_pair_groups'],
+ self._port_chain['project_id'],
+ )
+
+ def setUp(self):
+ super(TestCreateSfcPortChain, self).setUp()
+ mock.patch(
+ 'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
+ new=_get_id).start()
+ self.neutronclient.create_port_chain = mock.Mock(
+ return_value={'port_chain': self._port_chain})
+ self.data = self.get_data()
+
+ # Get the command object to test
+ self.cmd = sfc_port_chain.CreateSfcPortChain(self.app, self.namespace)
+
+ def test_create_port_chain_dafault_options(self):
+ arglist = [
+ self._port_chain['name'],
+ "--port-pair-group", self._port_chain['port_pair_groups']
+ ]
+ verifylist = [
+ ('name', self._port_chain['name']),
+ ('port_pair_groups', [self._port_chain['port_pair_groups']]),
+ ('flow_classifiers', []),
+ ('chain_parameters', None),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.neutronclient.create_port_chain.assert_called_once_with({
+ 'port_chain': {
+ 'name': self._port_chain['name'],
+ 'port_pair_groups': [self._port_chain['port_pair_groups']],
+ 'flow_classifiers': [],
+ 'chain_parameters': None}
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_port_chain_all_options(self):
+ arglist = [
+ "--description", self._port_chain['description'],
+ "--port-pair-group", self._port_chain['port_pair_groups'],
+ self._port_chain['name'],
+ "--flow-classifier", self._port_chain['flow_classifiers'],
+ "--chain-parameters", 'correlation=mpls,symmetric=true',
+ ]
+
+ cp = [{'correlation': 'mpls', 'symmetric': 'true'}]
+
+ verifylist = [
+ ('port_pair_groups', [self._port_chain['port_pair_groups']]),
+ ('name', self._port_chain['name']),
+ ('description', self._port_chain['description']),
+ ('flow_classifiers', [self._port_chain['flow_classifiers']]),
+ ('chain_parameters', cp)
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.neutronclient.create_port_chain.assert_called_once_with({
+ 'port_chain': {
+ 'name': self._port_chain['name'],
+ 'port_pair_groups': [self._port_chain['port_pair_groups']],
+ 'description': self._port_chain['description'],
+ 'flow_classifiers': [self._port_chain['flow_classifiers']],
+ 'chain_parameters': cp
+ }
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteSfcPortChain(fakes.TestNeutronClientOSCV2):
+
+ _port_chain = fakes.FakeSfcPortChain.create_port_chains(count=1)
+
+ def setUp(self):
+ super(TestDeleteSfcPortChain, self).setUp()
+ mock.patch(
+ 'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
+ new=_get_id).start()
+ self.neutronclient.delete_port_chain = mock.Mock(return_value=None)
+ self.cmd = sfc_port_chain.DeleteSfcPortChain(self.app, self.namespace)
+
+ def test_delete_port_chain(self):
+ client = self.app.client_manager.neutronclient
+ mock_port_chain_delete = client.delete_port_chain
+ arglist = [
+ self._port_chain[0]['id'],
+ ]
+ verifylist = [
+ ('port_chain', self._port_chain[0]['id']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ mock_port_chain_delete.assert_called_once_with(
+ self._port_chain[0]['id'])
+ self.assertIsNone(result)
+
+
+class TestListSfcPortChain(fakes.TestNeutronClientOSCV2):
+ _port_chains = fakes.FakeSfcPortChain.create_port_chains(count=1)
+ columns = ('ID', 'Name', 'Port Pair Groups', 'Flow Classifiers',
+ 'Chain Parameters', 'Chain ID')
+ columns_long = ('ID', 'Name', 'Port Pair Groups', 'Flow Classifiers',
+ 'Chain Parameters', 'Description', 'Chain ID', 'Project')
+ _port_chain = _port_chains[0]
+ data = [
+ _port_chain['id'],
+ _port_chain['name'],
+ _port_chain['port_pair_groups'],
+ _port_chain['flow_classifiers'],
+ _port_chain['chain_parameters'],
+ _port_chain['chain_id']
+ ]
+ data_long = [
+ _port_chain['id'],
+ _port_chain['name'],
+ _port_chain['project_id'],
+ _port_chain['chain_id'],
+ _port_chain['port_pair_groups'],
+ _port_chain['flow_classifiers'],
+ _port_chain['chain_parameters'],
+ _port_chain['description']
+ ]
+ _port_chain1 = {'port_chains': _port_chain}
+ _port_chain_id = _port_chain['id']
+
+ def setUp(self):
+ super(TestListSfcPortChain, self).setUp()
+ mock.patch(
+ 'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
+ new=_get_id).start()
+ self.neutronclient.list_port_chain = mock.Mock(
+ return_value={'port_chains': self._port_chains}
+ )
+ # Get the command object to test
+ self.cmd = sfc_port_chain.ListSfcPortChain(self.app, self.namespace)
+
+ def test_list_port_chain(self):
+ arglist = []
+ verifylist = []
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns = self.cmd.take_action(parsed_args)[0]
+ pcs = self.neutronclient.list_port_chain()['port_chains']
+ pc = pcs[0]
+ data = [
+ pc['id'],
+ pc['name'],
+ pc['port_pair_groups'],
+ pc['flow_classifiers'],
+ pc['chain_parameters'],
+ pc['chain_id']
+ ]
+ self.assertEqual(list(self.columns), columns)
+ self.assertEqual(self.data, data)
+
+ def test_list_port_chain_with_long_opion(self):
+ arglist = ['--long']
+ verifylist = [('long', True)]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns = self.cmd.take_action(parsed_args)[0]
+ pcs = self.neutronclient.list_port_chain()['port_chains']
+ pc = pcs[0]
+ data = [
+ pc['id'],
+ pc['name'],
+ pc['project_id'],
+ pc['chain_id'],
+ pc['port_pair_groups'],
+ pc['flow_classifiers'],
+ pc['chain_parameters'],
+ pc['description']
+ ]
+ self.assertEqual(list(self.columns_long), columns)
+ self.assertEqual(self.data_long, data)
+
+
+class TestSetSfcPortChain(fakes.TestNeutronClientOSCV2):
+ _port_chain = fakes.FakeSfcPortChain.create_port_chain()
+ resource = _port_chain
+ res = 'port_chain'
+ _port_chain_name = _port_chain['name']
+ _port_chain_id = _port_chain['id']
+ pc_ppg = _port_chain['port_pair_groups']
+ pc_fc = _port_chain['flow_classifiers']
+
+ def setUp(self):
+ super(TestSetSfcPortChain, self).setUp()
+ mock.patch(
+ 'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
+ new=_get_id).start()
+ self.mocked = self.neutronclient.update_port_chain
+ self.cmd = sfc_port_chain.SetSfcPortChain(self.app, self.namespace)
+
+ def test_set_port_chain(self):
+ client = self.app.client_manager.neutronclient
+ mock_port_chain_update = client.update_port_chain
+ arglist = [
+ self._port_chain_name,
+ '--name', 'name_updated',
+ '--description', 'desc_updated',
+ ]
+ verifylist = [
+ ('port_chain', self._port_chain_name),
+ ('name', 'name_updated'),
+ ('description', 'desc_updated'),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ attrs = {'port_chain': {'name': 'name_updated',
+ 'description': 'desc_updated'}}
+ mock_port_chain_update.assert_called_once_with(self._port_chain_name,
+ attrs)
+ self.assertIsNone(result)
+
+ def test_set_flow_classifier(self):
+ target = self.resource['id']
+ fc1 = 'flow_classifier1'
+
+ def _mock_flow_classifier(*args, **kwargs):
+
+ if self.neutronclient.find_resource.call_count == 1:
+ self.neutronclient.find_resource.assert_called_with(
+ 'flow_classifier', fc1, cmd_resource='sfc_flow_classifier')
+ return {'id': args[1]}
+
+ if self.neutronclient.find_resource.call_count == 2:
+ self.neutronclient.find_resource.assert_called_with(
+ self.res, target, cmd_resource='sfc_port_chain')
+ return {'flow_classifiers': self.pc_fc}
+ self.neutronclient.find_resource.side_effect = _mock_flow_classifier
+ arglist = [
+ target,
+ '--flow-classifier', fc1,
+ ]
+ verifylist = [
+ (self.res, target),
+ ('flow_classifiers', [fc1])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ expect = {'flow_classifiers': sorted([self.pc_fc, fc1])}
+ self.mocked.assert_called_once_with(target, {self.res: expect})
+ self.assertEqual(2, self.neutronclient.find_resource.call_count)
+ self.assertIsNone(result)
+
+ def test_set_no_flow_classifier(self):
+ client = self.app.client_manager.neutronclient
+ mock_port_chain_update = client.update_port_chain
+ arglist = [
+ self._port_chain_name,
+ '--no-flow-classifier',
+ ]
+ verifylist = [
+ ('port_chain', self._port_chain_name),
+ ('no_flow_classifier', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ attrs = {'port_chain': {'flow_classifiers': []}}
+ mock_port_chain_update.assert_called_once_with(self._port_chain_name,
+ attrs)
+ self.assertIsNone(result)
+
+ def test_set_port_pair_groups(self):
+ target = self.resource['id']
+ existing_ppg = self.pc_ppg
+ ppg1 = 'port_pair_group1'
+ ppg2 = 'port_pair_group2'
+
+ def _mock_flow_classifier(*args, **kwargs):
+ if self.neutronclient.find_resource.call_count == 1:
+ self.neutronclient.find_resource.assert_called_with(
+ self.res, target, cmd_resource='sfc_port_chain')
+ return {'port_pair_groups': self.pc_ppg}
+
+ if self.neutronclient.find_resource.call_count == 2:
+ self.neutronclient.find_resource.assert_called_with(
+ 'port_pair_group', ppg1,
+ cmd_resource='sfc_port_pair_group')
+ return {'id': args[1]}
+
+ if self.neutronclient.find_resource.call_count == 3:
+ self.neutronclient.find_resource.assert_called_with(
+ 'port_pair_group', ppg2,
+ cmd_resource='sfc_port_pair_group')
+ return {'id': args[1]}
+
+ self.neutronclient.find_resource.side_effect = _mock_flow_classifier
+ arglist = [
+ target,
+ '--port-pair-group', ppg1,
+ '--port-pair-group', ppg2,
+ ]
+ verifylist = [
+ (self.res, target),
+ ('port_pair_groups', [ppg1, ppg2])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ expect = {'port_pair_groups': sorted([existing_ppg, ppg1, ppg2])}
+ self.mocked.assert_called_once_with(target, {self.res: expect})
+ self.assertEqual(3, self.neutronclient.find_resource.call_count)
+ self.assertIsNone(result)
+
+ def test_set_no_port_pair_group(self):
+ target = self.resource['id']
+ ppg1 = 'port_pair_group1'
+
+ def _mock_port_pair_group(*args, **kwargs):
+
+ if self.neutronclient.find_resource.call_count == 1:
+ self.neutronclient.find_resource.assert_called_with(
+ 'port_pair_group', ppg1,
+ cmd_resource='sfc_port_pair_group')
+ return {'id': args[1]}
+ self.neutronclient.find_resource.side_effect = _mock_port_pair_group
+ arglist = [
+ target,
+ '--no-port-pair-group',
+ '--port-pair-group', ppg1,
+ ]
+ verifylist = [
+ (self.res, target),
+ ('no_port_pair_group', True),
+ ('port_pair_groups', [ppg1])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ expect = {'port_pair_groups': [ppg1]}
+ self.mocked.assert_called_once_with(target, {self.res: expect})
+ self.assertEqual(1, self.neutronclient.find_resource.call_count)
+ self.assertIsNone(result)
+
+ def test_set_only_no_port_pair_group(self):
+ target = self.resource['id']
+ arglist = [
+ target,
+ '--no-port-pair-group',
+ ]
+ verifylist = [
+ (self.res, target),
+ ('no_port_pair_group', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(
+ exceptions.CommandError, self.cmd.take_action, parsed_args)
+
+
+class TestShowSfcPortChain(fakes.TestNeutronClientOSCV2):
+
+ _pc = fakes.FakeSfcPortChain.create_port_chain()
+ data = (
+ _pc['chain_id'],
+ _pc['chain_parameters'],
+ _pc['description'],
+ _pc['flow_classifiers'],
+ _pc['id'],
+ _pc['name'],
+ _pc['port_pair_groups'],
+ _pc['project_id']
+ )
+ _port_chain = {'port_chain': _pc}
+ _port_chain_id = _pc['id']
+ columns = ('Chain ID',
+ 'Chain Parameters',
+ 'Description',
+ 'Flow Classifiers',
+ 'ID',
+ 'Name',
+ 'Port Pair Groups',
+ 'Project')
+
+ def setUp(self):
+ super(TestShowSfcPortChain, self).setUp()
+ mock.patch(
+ 'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
+ new=_get_id).start()
+ self.neutronclient.show_port_chain = mock.Mock(
+ return_value=self._port_chain
+ )
+ # Get the command object to test
+ self.cmd = sfc_port_chain.ShowSfcPortChain(self.app, self.namespace)
+
+ def test_show_port_chain(self):
+ client = self.app.client_manager.neutronclient
+ mock_port_chain_show = client.show_port_chain
+ arglist = [
+ self._port_chain_id,
+ ]
+ verifylist = [
+ ('port_chain', self._port_chain_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ mock_port_chain_show.assert_called_once_with(self._port_chain_id)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestUnsetSfcPortChain(fakes.TestNeutronClientOSCV2):
+ _port_chain = fakes.FakeSfcPortChain.create_port_chain()
+ resource = _port_chain
+ res = 'port_chain'
+ _port_chain_name = _port_chain['name']
+ _port_chain_id = _port_chain['id']
+ pc_ppg = _port_chain['port_pair_groups']
+ pc_fc = _port_chain['flow_classifiers']
+
+ def setUp(self):
+ super(TestUnsetSfcPortChain, self).setUp()
+ mock.patch(
+ 'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
+ new=_get_id).start()
+ self.neutronclient.update_port_chain = mock.Mock(
+ return_value=None)
+ self.mocked = self.neutronclient.update_port_chain
+ self.cmd = sfc_port_chain.UnsetSfcPortChain(self.app, self.namespace)
+
+ def test_unset_port_pair_group(self):
+ target = self.resource['id']
+ ppg1 = 'port_pair_group1'
+
+ def _mock_port_pair_group(*args, **kwargs):
+ if self.neutronclient.find_resource.call_count == 1:
+ self.neutronclient.find_resource.assert_called_with(
+ self.res, target, cmd_resource='sfc_port_chain')
+ return {'port_pair_groups': self.pc_ppg}
+
+ if self.neutronclient.find_resource.call_count == 2:
+ self.neutronclient.find_resource.assert_called_with(
+ 'port_pair_group', ppg1,
+ cmd_resource='sfc_port_pair_group')
+ return {'id': args[1]}
+ if self.neutronclient.find_resource.call_count == 3:
+ self.neutronclient.find_resource.assert_called_with(
+ self.res, target, cmd_resource='sfc_port_chain')
+ return {'id': args[1]}
+ self.neutronclient.find_resource.side_effect = _mock_port_pair_group
+
+ arglist = [
+ target,
+ '--port-pair-group', ppg1,
+ ]
+ verifylist = [
+ (self.res, target),
+ ('port_pair_groups', [ppg1])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ expect = {'port_pair_groups': sorted([self.pc_ppg])}
+ self.mocked.assert_called_once_with(target, {self.res: expect})
+ self.assertIsNone(result)
+
+ def test_unset_flow_classifier(self):
+ target = self.resource['id']
+ fc1 = 'flow_classifier1'
+
+ def _mock_flow_classifier(*args, **kwargs):
+ if self.neutronclient.find_resource.call_count == 1:
+ self.neutronclient.find_resource.assert_called_with(
+ self.res, target, cmd_resource='sfc_port_chain')
+ return {'flow_classifiers': self.pc_fc}
+
+ if self.neutronclient.find_resource.call_count == 2:
+ self.neutronclient.find_resource.assert_called_with(
+ 'flow_classifier', fc1, cmd_resource='sfc_flow_classifier')
+ return {'id': args[1]}
+ self.neutronclient.find_resource.side_effect = _mock_flow_classifier
+
+ arglist = [
+ target,
+ '--flow-classifier', fc1,
+ ]
+ verifylist = [
+ (self.res, target),
+ ('flow_classifiers', [fc1])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ expect = {'flow_classifiers': sorted([self.pc_fc])}
+ self.mocked.assert_called_once_with(target, {self.res: expect})
+ self.assertIsNone(result)
+
+ def test_unset_all_flow_classifier(self):
+ client = self.app.client_manager.neutronclient
+ target = self.resource['id']
+ mock_port_chain_update = client.update_port_chain
+ arglist = [
+ target,
+ '--all-flow-classifier',
+ ]
+ verifylist = [
+ (self.res, target),
+ ('all_flow_classifier', True)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ expect = {'flow_classifiers': []}
+ mock_port_chain_update.assert_called_once_with(target,
+ {self.res: expect})
+ self.assertIsNone(result)
diff --git a/neutronclient/tests/unit/osc/v2/sfc/test_port_pair.py b/neutronclient/tests/unit/osc/v2/sfc/test_port_pair.py
new file mode 100755
index 0000000..e6c539a
--- /dev/null
+++ b/neutronclient/tests/unit/osc/v2/sfc/test_port_pair.py
@@ -0,0 +1,302 @@
+# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutronclient.osc.v2.sfc import sfc_port_pair
+from neutronclient.tests.unit.osc.v2.sfc import fakes
+
+
+def _get_id(client, id_or_name, resource):
+ return id_or_name
+
+
+class TestCreateSfcPortPair(fakes.TestNeutronClientOSCV2):
+ # The new port_pair created
+ _port_pair = fakes.FakeSfcPortPair.create_port_pair()
+
+ columns = ('Description',
+ 'Egress Logical Port',
+ 'ID',
+ 'Ingress Logical Port',
+ 'Name',
+ 'Project',
+ 'Service Function Parameters')
+
+ def get_data(self):
+ return (
+ self._port_pair['description'],
+ self._port_pair['egress'],
+ self._port_pair['id'],
+ self._port_pair['ingress'],
+ self._port_pair['name'],
+ self._port_pair['project_id'],
+ self._port_pair['service_function_parameters']
+ )
+
+ def setUp(self):
+ super(TestCreateSfcPortPair, self).setUp()
+ mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
+ new=_get_id).start()
+ self.neutronclient.create_port_pair = mock.Mock(
+ return_value={'port_pair': self._port_pair})
+ self.data = self.get_data()
+
+ # Get the command object to test
+ self.cmd = sfc_port_pair.CreateSfcPortPair(self.app, self.namespace)
+
+ def test_create_port_pair_default_options(self):
+ arglist = [
+ "--ingress", self._port_pair['ingress'],
+ "--egress", self._port_pair['egress'],
+ self._port_pair['name'],
+ ]
+ verifylist = [
+ ('ingress', self._port_pair['ingress']),
+ ('egress', self._port_pair['egress']),
+ ('name', self._port_pair['name'])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.neutronclient.create_port_pair.assert_called_once_with({
+ 'port_pair': {'name': self._port_pair['name'],
+ 'ingress': self._port_pair['ingress'],
+ 'egress': self._port_pair['egress'],
+ 'service_function_parameters': None,
+ }
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_port_pair_all_options(self):
+ arglist = [
+ "--description", self._port_pair['description'],
+ "--egress", self._port_pair['egress'],
+ "--ingress", self._port_pair['ingress'],
+ self._port_pair['name'],
+ "--service-function-parameters", 'correlation=None,weight=1',
+ ]
+
+ sfp = [{'correlation': 'None', 'weight': '1'}]
+
+ verifylist = [
+ ('ingress', self._port_pair['ingress']),
+ ('egress', self._port_pair['egress']),
+ ('name', self._port_pair['name']),
+ ('description', self._port_pair['description']),
+ ('service_function_parameters', sfp)
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.neutronclient.create_port_pair.assert_called_once_with({
+ 'port_pair': {'name': self._port_pair['name'],
+ 'ingress': self._port_pair['ingress'],
+ 'egress': self._port_pair['egress'],
+ 'description': self._port_pair['description'],
+ 'service_function_parameters':
+ [{'correlation': 'None', 'weight': '1'}],
+ }
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteSfcPortPair(fakes.TestNeutronClientOSCV2):
+
+ _port_pair = fakes.FakeSfcPortPair.create_port_pairs(count=1)
+
+ def setUp(self):
+ super(TestDeleteSfcPortPair, self).setUp()
+ mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
+ new=_get_id).start()
+ self.neutronclient.delete_port_pair = mock.Mock(return_value=None)
+ self.cmd = sfc_port_pair.DeleteSfcPortPair(self.app, self.namespace)
+
+ def test_delete_port_pair(self):
+ client = self.app.client_manager.neutronclient
+ mock_port_pair_delete = client.delete_port_pair
+ arglist = [
+ self._port_pair[0]['id'],
+ ]
+ verifylist = [
+ ('port_pair', self._port_pair[0]['id']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ mock_port_pair_delete.assert_called_once_with(
+ self._port_pair[0]['id'])
+ self.assertIsNone(result)
+
+
+class TestListSfcPortPair(fakes.TestNeutronClientOSCV2):
+ _port_pairs = fakes.FakeSfcPortPair.create_port_pairs()
+ columns = ('ID', 'Name', 'Ingress Logical Port', 'Egress Logical Port')
+ columns_long = ('ID', 'Name', 'Ingress Logical Port',
+ 'Egress Logical Port', 'Service Function Parameters',
+ 'Description', 'Project')
+ _port_pair = _port_pairs[0]
+ data = [
+ _port_pair['id'],
+ _port_pair['name'],
+ _port_pair['ingress'],
+ _port_pair['egress']
+ ]
+ data_long = [
+ _port_pair['id'],
+ _port_pair['name'],
+ _port_pair['ingress'],
+ _port_pair['egress'],
+ _port_pair['service_function_parameters'],
+ _port_pair['description']
+ ]
+ _port_pair1 = {'port_pairs': _port_pair}
+ _port_pair_id = _port_pair['id'],
+
+ def setUp(self):
+ super(TestListSfcPortPair, self).setUp()
+ mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
+ new=_get_id).start()
+ self.neutronclient.list_port_pair = mock.Mock(
+ return_value={'port_pairs': self._port_pairs}
+ )
+ # Get the command object to test
+ self.cmd = sfc_port_pair.ListSfcPortPair(self.app, self.namespace)
+
+ def test_list_port_pair(self):
+ arglist = []
+ verifylist = []
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns = self.cmd.take_action(parsed_args)[0]
+ port_pairs = self.neutronclient.list_port_pair()['port_pairs']
+ port_pair = port_pairs[0]
+ data = [
+ port_pair['id'],
+ port_pair['name'],
+ port_pair['ingress'],
+ port_pair['egress']
+ ]
+ self.assertEqual(list(self.columns), columns)
+ self.assertEqual(self.data, data)
+
+ def test_list_with_long_option(self):
+ arglist = ['--long']
+ verifylist = [('long', True)]
+ port_pairs = self.neutronclient.list_port_pair()['port_pairs']
+ port_pair = port_pairs[0]
+ data = [
+ port_pair['id'],
+ port_pair['name'],
+ port_pair['ingress'],
+ port_pair['egress'],
+ port_pair['service_function_parameters'],
+ port_pair['description']
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns_long = self.cmd.take_action(parsed_args)[0]
+ self.assertEqual(list(self.columns_long), columns_long)
+ self.assertEqual(self.data_long, data)
+
+
+class TestSetSfcPortPair(fakes.TestNeutronClientOSCV2):
+ _port_pair = fakes.FakeSfcPortPair.create_port_pair()
+ _port_pair_name = _port_pair['name']
+ _port_pair_id = _port_pair['id']
+
+ def setUp(self):
+ super(TestSetSfcPortPair, self).setUp()
+ mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
+ new=_get_id).start()
+ self.neutronclient.update_port_pair = mock.Mock(return_value=None)
+ self.cmd = sfc_port_pair.SetSfcPortPair(self.app, self.namespace)
+
+ def test_set_port_pair(self):
+ client = self.app.client_manager.neutronclient
+ mock_port_pair_update = client.update_port_pair
+ arglist = [
+ self._port_pair_name,
+ '--name', 'name_updated',
+ '--description', 'desc_updated'
+ ]
+ verifylist = [
+ ('port_pair', self._port_pair_name),
+ ('name', 'name_updated'),
+ ('description', 'desc_updated'),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ attrs = {'port_pair': {
+ 'name': 'name_updated',
+ 'description': 'desc_updated'}
+ }
+ mock_port_pair_update.assert_called_once_with(self._port_pair_name,
+ attrs)
+ self.assertIsNone(result)
+
+
+class TestShowSfcPortPair(fakes.TestNeutronClientOSCV2):
+
+ _pp = fakes.FakeSfcPortPair.create_port_pair()
+
+ data = (
+ _pp['description'],
+ _pp['egress'],
+ _pp['id'],
+ _pp['ingress'],
+ _pp['name'],
+ _pp['project_id'],
+ _pp['service_function_parameters'],
+ )
+ _port_pair = {'port_pair': _pp}
+ _port_pair_id = _pp['id']
+ columns = (
+ 'Description',
+ 'Egress Logical Port',
+ 'ID',
+ 'Ingress Logical Port',
+ 'Name',
+ 'Project',
+ 'Service Function Parameters'
+ )
+
+ def setUp(self):
+ super(TestShowSfcPortPair, self).setUp()
+ mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
+ new=_get_id).start()
+
+ self.neutronclient.show_port_pair = mock.Mock(
+ return_value=self._port_pair
+ )
+
+ # Get the command object to test
+ self.cmd = sfc_port_pair.ShowSfcPortPair(self.app, self.namespace)
+
+ def test_show_port_pair(self):
+ client = self.app.client_manager.neutronclient
+ mock_port_pair_show = client.show_port_pair
+ arglist = [
+ self._port_pair_id,
+ ]
+ verifylist = [
+ ('port_pair', self._port_pair_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ mock_port_pair_show.assert_called_once_with(self._port_pair_id)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
diff --git a/neutronclient/tests/unit/osc/v2/sfc/test_port_pair_group.py b/neutronclient/tests/unit/osc/v2/sfc/test_port_pair_group.py
new file mode 100755
index 0000000..188494c
--- /dev/null
+++ b/neutronclient/tests/unit/osc/v2/sfc/test_port_pair_group.py
@@ -0,0 +1,424 @@
+# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutronclient.osc.v2.sfc import sfc_port_pair_group
+from neutronclient.tests.unit.osc.v2.sfc import fakes
+
+
+def _get_id(client, id_or_name, resource):
+ return id_or_name
+
+
+class TestCreateSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
+
+ _port_pair_group = fakes.FakeSfcPortPairGroup.create_port_pair_group()
+
+ columns = ('Description',
+ 'ID',
+ 'Loadbalance ID',
+ 'Name',
+ 'Port Pair',
+ 'Port Pair Group Parameters',
+ 'Project')
+
+ def get_data(self):
+ return (
+ self._port_pair_group['description'],
+ self._port_pair_group['id'],
+ self._port_pair_group['group_id'],
+ self._port_pair_group['name'],
+ self._port_pair_group['port_pairs'],
+ self._port_pair_group['port_pair_group_parameters'],
+ self._port_pair_group['project_id']
+ )
+
+ def setUp(self):
+ super(TestCreateSfcPortPairGroup, self).setUp()
+ mock.patch(
+ 'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
+ new=_get_id).start()
+ self.neutronclient.create_port_pair_group = mock.Mock(
+ return_value={'port_pair_group': self._port_pair_group})
+ self.data = self.get_data()
+ # Get the command object to test
+ self.cmd = sfc_port_pair_group.CreateSfcPortPairGroup(self.app,
+ self.namespace)
+
+ def test_create_port_pair_group_default_options(self):
+ arglist = [
+ "--port-pair", self._port_pair_group['port_pairs'],
+ self._port_pair_group['name'],
+ ]
+ verifylist = [
+ ('port_pairs', [self._port_pair_group['port_pairs']]),
+ ('name', self._port_pair_group['name']),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+ self.neutronclient.create_port_pair_group.assert_called_once_with({
+ 'port_pair_group': {
+ 'name': self._port_pair_group['name'],
+ 'port_pairs': [self._port_pair_group['port_pairs']]}
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_port_pair_group(self):
+ arglist = [
+ "--description", self._port_pair_group['description'],
+ "--port-pair", self._port_pair_group['port_pairs'],
+ self._port_pair_group['name'],
+ ]
+ verifylist = [
+ ('port_pairs', [self._port_pair_group['port_pairs']]),
+ ('name', self._port_pair_group['name']),
+ ('description', self._port_pair_group['description']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.neutronclient.create_port_pair_group.assert_called_once_with({
+ 'port_pair_group': {
+ 'name': self._port_pair_group['name'],
+ 'port_pairs': [self._port_pair_group['port_pairs']],
+ 'description': self._port_pair_group['description'],
+ }
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
+
+ _port_pair_group = (fakes.FakeSfcPortPairGroup.create_port_pair_groups
+ (count=1))
+
+ def setUp(self):
+ super(TestDeleteSfcPortPairGroup, self).setUp()
+ mock.patch(
+ 'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
+ new=_get_id).start()
+ self.neutronclient.delete_port_pair_group = mock.Mock(
+ return_value=None)
+ self.cmd = sfc_port_pair_group.DeleteSfcPortPairGroup(self.app,
+ self.namespace)
+
+ def test_delete_port_pair_group(self):
+ client = self.app.client_manager.neutronclient
+ mock_port_pair_group_delete = client.delete_port_pair_group
+ arglist = [
+ self._port_pair_group[0]['id'],
+ ]
+ verifylist = [
+ ('port_pair_group', self._port_pair_group[0]['id']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ mock_port_pair_group_delete.assert_called_once_with(
+ self._port_pair_group[0]['id'])
+ self.assertIsNone(result)
+
+
+class TestListSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
+ _ppgs = fakes.FakeSfcPortPairGroup.create_port_pair_groups(count=1)
+ columns = ('ID', 'Name', 'Port Pair', 'Port Pair Group Parameters')
+ columns_long = ('ID', 'Name', 'Port Pair', 'Port Pair Group Parameters',
+ 'Description', 'Loadbalance ID', 'Project')
+ _port_pair_group = _ppgs[0]
+ data = [
+ _port_pair_group['id'],
+ _port_pair_group['name'],
+ _port_pair_group['port_pairs'],
+ _port_pair_group['port_pair_group_parameters']
+ ]
+ data_long = [
+ _port_pair_group['id'],
+ _port_pair_group['name'],
+ _port_pair_group['port_pairs'],
+ _port_pair_group['port_pair_group_parameters'],
+ _port_pair_group['description']
+ ]
+ _port_pair_group1 = {'port_pair_groups': _port_pair_group}
+ _port_pair_id = _port_pair_group['id']
+
+ def setUp(self):
+ super(TestListSfcPortPairGroup, self).setUp()
+ mock.patch(
+ 'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
+ new=_get_id).start()
+
+ self.neutronclient.list_port_pair_group = mock.Mock(
+ return_value={'port_pair_groups': self._ppgs}
+ )
+ # Get the command object to test
+ self.cmd = sfc_port_pair_group.ListSfcPortPairGroup(self.app,
+ self.namespace)
+
+ def test_list_port_pair_group(self):
+ arglist = []
+ verifylist = []
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns = self.cmd.take_action(parsed_args)[0]
+ ppgs = self.neutronclient.list_port_pair_group()['port_pair_groups']
+ ppg = ppgs[0]
+ data = [
+ ppg['id'],
+ ppg['name'],
+ ppg['port_pairs'],
+ ppg['port_pair_group_parameters']
+ ]
+ self.assertEqual(list(self.columns), columns)
+ self.assertEqual(self.data, data)
+
+ def test_list_with_long_option(self):
+ arglist = ['--long']
+ verifylist = [('long', True)]
+ ppgs = self.neutronclient.list_port_pair_group()['port_pair_groups']
+ ppg = ppgs[0]
+ data = [
+ ppg['id'],
+ ppg['name'],
+ ppg['port_pairs'],
+ ppg['port_pair_group_parameters'],
+ ppg['description']
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns_long = self.cmd.take_action(parsed_args)[0]
+ self.assertEqual(list(self.columns_long), columns_long)
+ self.assertEqual(self.data_long, data)
+
+
+class TestSetSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
+ _port_pair_group = fakes.FakeSfcPortPairGroup.create_port_pair_group()
+ resource = _port_pair_group
+ res = 'port_pair_group'
+ _port_pair_group_name = _port_pair_group['name']
+ ppg_pp = _port_pair_group['port_pairs']
+ _port_pair_group_id = _port_pair_group['id']
+
+ def setUp(self):
+ super(TestSetSfcPortPairGroup, self).setUp()
+
+ mock.patch(
+ 'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
+ new=_get_id).start()
+ self.neutronclient.update_port_pair_group = mock.Mock(
+ return_value=None)
+ self.mocked = self.neutronclient.update_port_pair_group
+ self.cmd = sfc_port_pair_group.SetSfcPortPairGroup(self.app,
+ self.namespace)
+
+ def test_set_port_pair_group(self):
+ target = self.resource['id']
+ port_pair1 = 'additional_port1'
+ port_pair2 = 'additional_port2'
+
+ def _mock_port_pair_group(*args, **kwargs):
+
+ if self.neutronclient.find_resource.call_count == 1:
+ self.neutronclient.find_resource.assert_called_with(
+ 'port_pair', port_pair1, cmd_resource='sfc_port_pair')
+ return {'id': args[1]}
+
+ if self.neutronclient.find_resource.call_count == 2:
+ self.neutronclient.find_resource.assert_called_with(
+ 'port_pair', port_pair2, cmd_resource='sfc_port_pair')
+ return {'id': args[1]}
+
+ if self.neutronclient.find_resource.call_count == 3:
+ self.neutronclient.find_resource.assert_called_with(
+ self.res, target, cmd_resource='sfc_port_pair_group')
+ return {'port_pairs': self.ppg_pp}
+
+ self.neutronclient.find_resource.side_effect = _mock_port_pair_group
+
+ arglist = [
+ target,
+ '--port-pair', port_pair1,
+ '--port-pair', port_pair2,
+ ]
+ verifylist = [
+ (self.res, target),
+ ('port_pairs', [port_pair1, port_pair2])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ expect = {'port_pairs': sorted([self.ppg_pp, port_pair1, port_pair2])}
+ self.mocked.assert_called_once_with(target, {self.res: expect})
+ self.assertEqual(3, self.neutronclient.find_resource.call_count)
+ self.assertIsNone(result)
+
+ def test_set_no_port_pair(self):
+ client = self.app.client_manager.neutronclient
+ mock_port_pair_group_update = client.update_port_pair_group
+ arglist = [
+ self._port_pair_group_name,
+ '--name', 'name_updated',
+ '--description', 'desc_updated',
+ '--no-port-pair',
+ ]
+ verifylist = [
+ ('port_pair_group', self._port_pair_group_name),
+ ('name', 'name_updated'),
+ ('description', 'desc_updated'),
+ ('no_port_pair', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {'port_pair_group': {'name': 'name_updated',
+ 'description': 'desc_updated',
+ 'port_pairs': []}}
+ mock_port_pair_group_update.assert_called_once_with(
+ self._port_pair_group_name, attrs)
+ self.assertIsNone(result)
+
+
+class TestShowSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
+
+ _ppg = fakes.FakeSfcPortPairGroup.create_port_pair_group()
+ data = (
+ _ppg['description'],
+ _ppg['id'],
+ _ppg['group_id'],
+ _ppg['name'],
+ _ppg['port_pairs'],
+ _ppg['port_pair_group_parameters'],
+ _ppg['project_id'])
+ _port_pair_group = {'port_pair_group': _ppg}
+ _port_pair_group_id = _ppg['id']
+ columns = (
+ 'Description',
+ 'ID',
+ 'Loadbalance ID',
+ 'Name',
+ 'Port Pair',
+ 'Port Pair Group Parameters',
+ 'Project')
+
+ def setUp(self):
+ super(TestShowSfcPortPairGroup, self).setUp()
+ mock.patch(
+ 'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
+ new=_get_id).start()
+
+ self.neutronclient.show_port_pair_group = mock.Mock(
+ return_value=self._port_pair_group
+ )
+ self.cmd = sfc_port_pair_group.ShowSfcPortPairGroup(self.app,
+ self.namespace)
+
+ def test_show_port_pair_group(self):
+ client = self.app.client_manager.neutronclient
+ mock_port_pair_group_show = client.show_port_pair_group
+ arglist = [
+ self._port_pair_group_id,
+ ]
+ verifylist = [
+ ('port_pair_group', self._port_pair_group_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ mock_port_pair_group_show.assert_called_once_with(
+ self._port_pair_group_id)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestUnsetSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
+ _port_pair_group = fakes.FakeSfcPortPairGroup.create_port_pair_group()
+ resource = _port_pair_group
+ res = 'port_pair_group'
+ _port_pair_group_name = _port_pair_group['name']
+ _port_pair_group_id = _port_pair_group['id']
+ ppg_pp = _port_pair_group['port_pairs']
+
+ def setUp(self):
+ super(TestUnsetSfcPortPairGroup, self).setUp()
+ mock.patch(
+ 'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
+ new=_get_id).start()
+ self.neutronclient.update_port_pair_group = mock.Mock(
+ return_value=None)
+ self.mocked = self.neutronclient.update_port_pair_group
+ self.cmd = sfc_port_pair_group.UnsetSfcPortPairGroup(
+ self.app, self.namespace)
+
+ def test_unset_port_pair(self):
+ target = self.resource['id']
+ port_pair1 = 'additional_port1'
+ port_pair2 = 'additional_port2'
+
+ def _mock_port_pair(*args, **kwargs):
+
+ if self.neutronclient.find_resource.call_count == 1:
+ self.neutronclient.find_resource.assert_called_with(
+ self.res, target, cmd_resource='sfc_port_pair_group')
+ return {'port_pairs': self.ppg_pp}
+
+ if self.neutronclient.find_resource.call_count == 2:
+ self.neutronclient.find_resource.assert_called_with(
+ 'port_pair', port_pair1, cmd_resource='sfc_port_pair')
+ return {'id': args[1]}
+
+ if self.neutronclient.find_resource.call_count == 3:
+ self.neutronclient.find_resource.assert_called_with(
+ 'port_pair', port_pair2, cmd_resource='sfc_port_pair')
+ return {'id': args[1]}
+
+ if self.neutronclient.find_resource.call_count == 4:
+ self.neutronclient.find_resource.assert_called_with(
+ self.res, target, cmd_resource='sfc_port_pair_group')
+ return {'id': args[1]}
+
+ self.neutronclient.find_resource.side_effect = _mock_port_pair
+
+ arglist = [
+ target,
+ '--port-pair', port_pair1,
+ '--port-pair', port_pair2,
+ ]
+ verifylist = [
+ (self.res, target),
+ ('port_pairs', [port_pair1, port_pair2])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ expect = {'port_pairs': sorted([self.ppg_pp])}
+ self.mocked.assert_called_once_with(target, {self.res: expect})
+ self.assertIsNone(result)
+
+ def test_unset_all_port_pair(self):
+ client = self.app.client_manager.neutronclient
+ mock_port_pair_group_update = client.update_port_pair_group
+ arglist = [
+ self._port_pair_group_name,
+ '--all-port-pair',
+ ]
+ verifylist = [
+ ('port_pair_group', self._port_pair_group_name),
+ ('all_port_pair', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {'port_pair_group': {'port_pairs': []}}
+ mock_port_pair_group_update.assert_called_once_with(
+ self._port_pair_group_name, attrs)
+ self.assertIsNone(result)
diff --git a/neutronclient/v2_0/client.py b/neutronclient/v2_0/client.py
index 759f776..96bb01d 100644
--- a/neutronclient/v2_0/client.py
+++ b/neutronclient/v2_0/client.py
@@ -511,6 +511,16 @@ class Client(ClientBase):
security_group_path = "/security-groups/%s"
security_group_rules_path = "/security-group-rules"
security_group_rule_path = "/security-group-rules/%s"
+
+ sfc_flow_classifiers_path = "/sfc/flow_classifiers"
+ sfc_flow_classifier_path = "/sfc/flow_classifiers/%s"
+ sfc_port_pairs_path = "/sfc/port_pairs"
+ sfc_port_pair_path = "/sfc/port_pairs/%s"
+ sfc_port_pair_groups_path = "/sfc/port_pair_groups"
+ sfc_port_pair_group_path = "/sfc/port_pair_groups/%s"
+ sfc_port_chains_path = "/sfc/port_chains"
+ sfc_port_chain_path = "/sfc/port_chains/%s"
+
endpoint_groups_path = "/vpn/endpoint-groups"
endpoint_group_path = "/vpn/endpoint-groups/%s"
vpnservices_path = "/vpn/vpnservices"
@@ -692,6 +702,10 @@ class Client(ClientBase):
'bgpvpns': 'bgpvpn',
'network_associations': 'network_association',
'router_associations': 'router_association',
+ 'flow_classifiers': 'flow_classifier',
+ 'port_pairs': 'port_pair',
+ 'port_pair_groups': 'port_pair_group',
+ 'port_chains': 'port_chain',
}
def list_ext(self, collection, path, retrieve_all, **_params):
@@ -2159,6 +2173,95 @@ class Client(ClientBase):
return self.delete(
self.bgpvpn_router_association_path % (bgpvpn, router_assoc))
+ def create_port_pair(self, body=None):
+ """Creates a new Port Pair."""
+ return self.post(self.sfc_port_pairs_path, body=body)
+
+ def update_port_pair(self, port_pair, body=None):
+ """Update a Port Pair."""
+ return self.put(self.sfc_port_pair_path % port_pair, body=body)
+
+ def delete_port_pair(self, port_pair):
+ """Deletes the specified Port Pair."""
+ return self.delete(self.sfc_port_pair_path % (port_pair))
+
+ def list_port_pair(self, retrieve_all=True, **_params):
+ """Fetches a list of all Port Pairs."""
+ return self.list('port_pairs', self.sfc_port_pairs_path, retrieve_all,
+ **_params)
+
+ def show_port_pair(self, port_pair, **_params):
+ """Fetches information of a certain Port Pair."""
+ return self.get(self.sfc_port_pair_path % (port_pair), params=_params)
+
+ def create_port_pair_group(self, body=None):
+ """Creates a new Port Pair Group."""
+ return self.post(self.sfc_port_pair_groups_path, body=body)
+
+ def update_port_pair_group(self, port_pair_group, body=None):
+ """Update a Port Pair Group."""
+ return self.put(self.sfc_port_pair_group_path % port_pair_group,
+ body=body)
+
+ def delete_port_pair_group(self, port_pair_group):
+ """Deletes the specified Port Pair Group."""
+ return self.delete(self.sfc_port_pair_group_path % (port_pair_group))
+
+ def list_port_pair_group(self, retrieve_all=True, **_params):
+ """Fetches a list of all Port Pair Groups."""
+ return self.list('port_pair_groups', self.sfc_port_pair_groups_path,
+ retrieve_all, **_params)
+
+ def show_port_pair_group(self, port_pair_group, **_params):
+ """Fetches information of a certain Port Pair Group."""
+ return self.get(self.sfc_port_pair_group_path % (port_pair_group),
+ params=_params)
+
+ def create_port_chain(self, body=None):
+ """Creates a new Port Chain."""
+ return self.post(self.sfc_port_chains_path, body=body)
+
+ def update_port_chain(self, port_chain, body=None):
+ """Update a Port Chain."""
+ return self.put(self.sfc_port_chain_path % port_chain, body=body)
+
+ def delete_port_chain(self, port_chain):
+ """Deletes the specified Port Chain."""
+ return self.delete(self.sfc_port_chain_path % (port_chain))
+
+ def list_port_chain(self, retrieve_all=True, **_params):
+ """Fetches a list of all Port Chains."""
+ return self.list('port_chains', self.sfc_port_chains_path,
+ retrieve_all, **_params)
+
+ def show_port_chain(self, port_chain, **_params):
+ """Fetches information of a certain Port Chain."""
+ return self.get(self.sfc_port_chain_path % (port_chain),
+ params=_params)
+
+ def create_flow_classifier(self, body=None):
+ """Creates a new Flow Classifier."""
+ return self.post(self.sfc_flow_classifiers_path, body=body)
+
+ def update_flow_classifier(self, flow_classifier, body=None):
+ """Update a Flow Classifier."""
+ return self.put(self.sfc_flow_classifier_path % flow_classifier,
+ body=body)
+
+ def delete_flow_classifier(self, flow_classifier):
+ """Deletes the specified Flow Classifier."""
+ return self.delete(self.sfc_flow_classifier_path % (flow_classifier))
+
+ def list_flow_classifier(self, retrieve_all=True, **_params):
+ """Fetches a list of all Flow Classifiers."""
+ return self.list('flow_classifiers', self.sfc_flow_classifiers_path,
+ retrieve_all, **_params)
+
+ def show_flow_classifier(self, flow_classifier, **_params):
+ """Fetches information of a certain Flow Classifier."""
+ return self.get(self.sfc_flow_classifier_path % (flow_classifier),
+ params=_params)
+
def __init__(self, **kwargs):
"""Initialize a new client for the Neutron v2.0 API."""
super(Client, self).__init__(**kwargs)
diff --git a/releasenotes/notes/add-sfc-commands.yaml b/releasenotes/notes/add-sfc-commands.yaml
new file mode 100644
index 0000000..c081f50
--- /dev/null
+++ b/releasenotes/notes/add-sfc-commands.yaml
@@ -0,0 +1,5 @@
+---
+features:
+ - |
+ Add OSC plugin support for the “Networking Service Function Chaining” feature commands along with client bindings.
+ [Blueprint `openstackclient-cli-porting <https://blueprints.launchpad.net/networking-sfc/+spec/openstackclient-cli-porting>`_] \ No newline at end of file
diff --git a/setup.cfg b/setup.cfg
index ac7da5c..d62e29f 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -43,6 +43,28 @@ openstack.neutronclient.v2 =
network_trunk_set = neutronclient.osc.v2.trunk.network_trunk:SetNetworkTrunk
network_trunk_show = neutronclient.osc.v2.trunk.network_trunk:ShowNetworkTrunk
network_trunk_unset = neutronclient.osc.v2.trunk.network_trunk:UnsetNetworkTrunk
+ sfc_flow_classifier_create = neutronclient.osc.v2.sfc.sfc_flow_classifier:CreateSfcFlowClassifier
+ sfc_flow_classifier_delete = neutronclient.osc.v2.sfc.sfc_flow_classifier:DeleteSfcFlowClassifier
+ sfc_flow_classifier_list = neutronclient.osc.v2.sfc.sfc_flow_classifier:ListSfcFlowClassifier
+ sfc_flow_classifier_set = neutronclient.osc.v2.sfc.sfc_flow_classifier:SetSfcFlowClassifier
+ sfc_flow_classifier_show = neutronclient.osc.v2.sfc.sfc_flow_classifier:ShowSfcFlowClassifier
+ sfc_port_chain_create = neutronclient.osc.v2.sfc.sfc_port_chain:CreateSfcPortChain
+ sfc_port_chain_delete = neutronclient.osc.v2.sfc.sfc_port_chain:DeleteSfcPortChain
+ sfc_port_chain_list = neutronclient.osc.v2.sfc.sfc_port_chain:ListSfcPortChain
+ sfc_port_chain_set = neutronclient.osc.v2.sfc.sfc_port_chain:SetSfcPortChain
+ sfc_port_chain_show = neutronclient.osc.v2.sfc.sfc_port_chain:ShowSfcPortChain
+ sfc_port_chain_unset = neutronclient.osc.v2.sfc.sfc_port_chain:UnsetSfcPortChain
+ sfc_port_pair_create = neutronclient.osc.v2.sfc.sfc_port_pair:CreateSfcPortPair
+ sfc_port_pair_delete = neutronclient.osc.v2.sfc.sfc_port_pair:DeleteSfcPortPair
+ sfc_port_pair_list = neutronclient.osc.v2.sfc.sfc_port_pair:ListSfcPortPair
+ sfc_port_pair_set = neutronclient.osc.v2.sfc.sfc_port_pair:SetSfcPortPair
+ sfc_port_pair_show = neutronclient.osc.v2.sfc.sfc_port_pair:ShowSfcPortPair
+ sfc_port_pair_group_create = neutronclient.osc.v2.sfc.sfc_port_pair_group:CreateSfcPortPairGroup
+ sfc_port_pair_group_delete = neutronclient.osc.v2.sfc.sfc_port_pair_group:DeleteSfcPortPairGroup
+ sfc_port_pair_group_list = neutronclient.osc.v2.sfc.sfc_port_pair_group:ListSfcPortPairGroup
+ sfc_port_pair_group_set = neutronclient.osc.v2.sfc.sfc_port_pair_group:SetSfcPortPairGroup
+ sfc_port_pair_group_show = neutronclient.osc.v2.sfc.sfc_port_pair_group:ShowSfcPortPairGroup
+ sfc_port_pair_group_unset = neutronclient.osc.v2.sfc.sfc_port_pair_group:UnsetSfcPortPairGroup
firewall_group_create = neutronclient.osc.v2.fwaas.firewallgroup:CreateFirewallGroup
firewall_group_delete = neutronclient.osc.v2.fwaas.firewallgroup:DeleteFirewallGroup