diff options
author | Steve Martinelli <stevemar@ca.ibm.com> | 2015-12-15 18:00:16 -0500 |
---|---|---|
committer | Steve Martinelli <stevemar@ca.ibm.com> | 2016-03-10 03:51:01 +0000 |
commit | ef13bd8cf6c6e46f4ce04fa3a21552913417b586 (patch) | |
tree | 80119db045b13244f9d13b87dc73913216d20c35 | |
parent | 9b028b5cdd1b0ef7260fa530a5a5925d78f81646 (diff) | |
download | python-keystoneclient-ef13bd8cf6c6e46f4ce04fa3a21552913417b586.tar.gz |
remove CLI from keystoneclient
the CLI has been deprecated for a long time, and many docs and
install guides recommend using OSC instead of `keystone`.
- removes CLI
- removes man page from docs
- removes CLI tests
- removes `bootstrap` from contrib
- removes entrypoint from setup.cfg
implements bp: remove-cli
Change-Id: Icbe15814bc4faf33f513f9654440068795eae807
-rw-r--r-- | doc/source/conf.py | 5 | ||||
-rw-r--r-- | doc/source/man/keystone.rst | 158 | ||||
-rw-r--r-- | keystoneclient/contrib/bootstrap/__init__.py | 0 | ||||
-rw-r--r-- | keystoneclient/contrib/bootstrap/shell.py | 40 | ||||
-rw-r--r-- | keystoneclient/generic/shell.py | 50 | ||||
-rw-r--r-- | keystoneclient/shell.py | 472 | ||||
-rw-r--r-- | keystoneclient/tests/functional/test_cli.py | 143 | ||||
-rw-r--r-- | keystoneclient/tests/unit/generic/test_shell.py | 129 | ||||
-rw-r--r-- | keystoneclient/tests/unit/test_shell.py | 534 | ||||
-rw-r--r-- | keystoneclient/tests/unit/test_utils.py | 35 | ||||
-rw-r--r-- | keystoneclient/tests/unit/v2_0/test_shell.py | 460 | ||||
-rw-r--r-- | keystoneclient/utils.py | 69 | ||||
-rwxr-xr-x | keystoneclient/v2_0/shell.py | 547 | ||||
-rw-r--r-- | releasenotes/notes/remove_cli-d2c4435ba6a09b79.yaml | 7 | ||||
-rw-r--r-- | requirements.txt | 1 | ||||
-rw-r--r-- | setup.cfg | 3 |
16 files changed, 8 insertions, 2645 deletions
diff --git a/doc/source/conf.py b/doc/source/conf.py index 8a7a5ce..eb83a15 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -109,10 +109,7 @@ modindex_common_prefix = ['keystoneclient.'] # Grouping the document tree for man pages. # List of tuples 'sourcefile', 'target', 'title', 'Authors name', 'manual' -man_pages = [ - ('man/keystone', 'keystone', 'Client for OpenStack Identity API', - ['OpenStack Contributors'], 1), -] +#man_pages = [] # -- Options for HTML output -------------------------------------------------- diff --git a/doc/source/man/keystone.rst b/doc/source/man/keystone.rst deleted file mode 100644 index 10d9d93..0000000 --- a/doc/source/man/keystone.rst +++ /dev/null @@ -1,158 +0,0 @@ -============================================================== -:program:`keystone` command line utility (pending deprecation) -============================================================== - -.. program:: keystone -.. highlight:: bash - -SYNOPSIS -======== - -:program:`keystone` [options] <command> [command-options] - -:program:`keystone help` - -:program:`keystone help` <command> - - -DESCRIPTION -=========== - -.. WARNING:: - - The :program:`keystone` command line utility is pending deprecation. The - `OpenStackClient unified command line utility - <http://docs.openstack.org/developer/python-openstackclient/>`_ should be - used instead. The :program:`keystone` command line utility only supports V2 - of the Identity API whereas the OSC program supports both V2 and V3. - -The :program:`keystone` command line utility interacts with services providing -OpenStack Identity API (e.g. Keystone). - -To communicate with the API, you will need to be authenticated - and the -:program:`keystone` provides multiple options for this. - -While bootstrapping Keystone the authentication is accomplished with a -shared secret token and the location of the Identity API endpoint. The -shared secret token is configured in keystone.conf as "admin_token". - -You can specify those values on the command line with :option:`--os-token` -and :option:`--os-endpoint`, or set them in environment variables: - -.. envvar:: OS_SERVICE_TOKEN - - Your Keystone administrative token - -.. envvar:: OS_SERVICE_ENDPOINT - - Your Identity API endpoint - -The command line options will override any environment variables set. - -If you already have accounts, you can use your OpenStack username and -password. You can do this with the :option:`--os-username`, -:option:`--os-password`. - -Keystone allows a user to be associated with one or more projects which are -historically called tenants. To specify the project for which you want to -authorize against, you may optionally specify a :option:`--os-tenant-id` or -:option:`--os-tenant-name`. - -Instead of using options, it is easier to just set them as environment -variables: - -.. envvar:: OS_USERNAME - - Your Keystone username. - -.. envvar:: OS_PASSWORD - - Your Keystone password. - -.. envvar:: OS_TENANT_NAME - - Name of Keystone project. - -.. envvar:: OS_TENANT_ID - - ID of Keystone Tenant. - -.. envvar:: OS_AUTH_URL - - The OpenStack API server URL. - -.. envvar:: OS_IDENTITY_API_VERSION - - The OpenStack Identity API version. - -.. envvar:: OS_CACERT - - The location for the CA truststore (PEM formatted) for this client. - -.. envvar:: OS_CERT - - The location for the keystore (PEM formatted) containing the public - key of this client. This keystore can also optionally contain the - private key of this client. - -.. envvar:: OS_KEY - - The location for the keystore (PEM formatted) containing the private - key of this client. This value can be empty if the private key is - included in the OS_CERT file. - -For example, in Bash you'd use:: - - export OS_USERNAME=yourname - export OS_PASSWORD=yadayadayada - export OS_TENANT_NAME=myproject - export OS_AUTH_URL=http(s)://example.com:5000/v2.0/ - export OS_IDENTITY_API_VERSION=2.0 - export OS_CACERT=/etc/keystone/yourca.pem - export OS_CERT=/etc/keystone/yourpublickey.pem - export OS_KEY=/etc/keystone/yourprivatekey.pem - - -OPTIONS -======= - -To get a list of available commands and options run:: - - keystone help - -To get usage and options of a command:: - - keystone help <command> - - -EXAMPLES -======== - -Get information about endpoint-create command:: - - keystone help endpoint-create - -View endpoints of OpenStack services:: - - keystone catalog - -Create a 'service' project:: - - keystone tenant-create --name=service - -Create service user for nova:: - - keystone user-create --name=nova \ - --tenant_id=<project ID> \ - --email=nova@nothing.com - -View roles:: - - keystone role-list - - -BUGS -==== - -Keystone client is hosted in Launchpad so you can view current bugs at -https://bugs.launchpad.net/python-keystoneclient/. diff --git a/keystoneclient/contrib/bootstrap/__init__.py b/keystoneclient/contrib/bootstrap/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/keystoneclient/contrib/bootstrap/__init__.py +++ /dev/null diff --git a/keystoneclient/contrib/bootstrap/shell.py b/keystoneclient/contrib/bootstrap/shell.py deleted file mode 100644 index 9a4ed9f..0000000 --- a/keystoneclient/contrib/bootstrap/shell.py +++ /dev/null @@ -1,40 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from keystoneclient import utils -from keystoneclient.v2_0 import client - - -@utils.arg('--user-name', metavar='<user-name>', default='admin', dest='user', - help='The name of the user to be created (default="admin").') -@utils.arg('--pass', metavar='<password>', required=True, dest='passwd', - help='The password for the new user.') -@utils.arg('--role-name', metavar='<role-name>', default='admin', dest='role', - help='The name of the role to be created and granted to the user ' - '(default="admin").') -@utils.arg('--tenant-name', metavar='<tenant-name>', default='admin', - dest='tenant', - help='The name of the tenant to be created (default="admin").') -def do_bootstrap(kc, args): - """Grants a new role to a new user on a new tenant, after creating each.""" - tenant = kc.tenants.create(tenant_name=args.tenant) - role = kc.roles.create(name=args.role) - user = kc.users.create(name=args.user, password=args.passwd, email=None) - kc.roles.add_user_role(user=user, role=role, tenant=tenant) - - # verify the result - user_client = client.Client( - username=args.user, - password=args.passwd, - tenant_name=args.tenant, - auth_url=kc.management_url) - user_client.authenticate() diff --git a/keystoneclient/generic/shell.py b/keystoneclient/generic/shell.py deleted file mode 100644 index d1b7b7e..0000000 --- a/keystoneclient/generic/shell.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright 2010 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import six - -from keystoneclient.generic import client -from keystoneclient.i18n import _ -from keystoneclient import utils - - -CLIENT_CLASS = client.Client - - -@utils.unauthenticated -def do_discover(cs, args): - """Discover Keystone servers, supported API versions and extensions.""" - if cs.endpoint: - versions = cs.discover(cs.endpoint) - elif cs.auth_url: - versions = cs.discover(cs.auth_url) - else: - versions = cs.discover() - if versions: - if 'message' in versions: - print(versions['message']) - for key, version in six.iteritems(versions): - if key != 'message': - print(_(" - supports version %(id)s (%(status)s) here " - "%(url)s") % - version) - extensions = cs.discover_extensions(version['url']) - if extensions: - for key, extension in six.iteritems(extensions): - if key != 'message': - print(_(" - and %(key)s: %(extension)s") % - {'key': key, 'extension': extension}) - else: - print(_("No Keystone-compatible endpoint found")) diff --git a/keystoneclient/shell.py b/keystoneclient/shell.py deleted file mode 100644 index 4a6dbd2..0000000 --- a/keystoneclient/shell.py +++ /dev/null @@ -1,472 +0,0 @@ -# Copyright 2010 Jacob Kaplan-Moss -# Copyright 2011 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Command-line interface to the OpenStack Identity API.""" - -from __future__ import print_function - -import argparse -import logging -import os -import sys -import warnings - -from oslo_utils import encodeutils -import six - -import keystoneclient -from keystoneclient import access -from keystoneclient.contrib.bootstrap import shell as shell_bootstrap -from keystoneclient import exceptions as exc -from keystoneclient.generic import shell as shell_generic -from keystoneclient import session -from keystoneclient import utils -from keystoneclient.v2_0 import shell as shell_v2_0 - - -def env(*vars, **kwargs): - """Search for the first defined of possibly many env vars - - Returns the first environment variable defined in vars, or - returns the default defined in kwargs. - - """ - for v in vars: - value = os.environ.get(v) - if value: - return value - return kwargs.get('default', '') - - -class OpenStackIdentityShell(object): - - def __init__(self, parser_class=argparse.ArgumentParser): - - # Since Python 2.7, DeprecationWarning is ignored by default, enable - # it so that the deprecation message is displayed. - warnings.simplefilter('once', category=DeprecationWarning) - warnings.warn( - 'The keystone CLI is deprecated in favor of ' - 'python-openstackclient. For a Python library, continue using ' - 'python-keystoneclient.', DeprecationWarning) - # And back to normal! - warnings.resetwarnings() - self.parser_class = parser_class - - def get_base_parser(self): - parser = self.parser_class( - prog='keystone', - description=__doc__.strip(), - epilog='See "keystone help COMMAND" ' - 'for help on a specific command.', - add_help=False, - formatter_class=OpenStackHelpFormatter, - ) - - # Global arguments - parser.add_argument('-h', - '--help', - action='store_true', - help=argparse.SUPPRESS) - - parser.add_argument('--version', - action='version', - version=keystoneclient.__version__, - help="Shows the client version and exits.") - - parser.add_argument('--debug', - default=False, - action='store_true', - help="Prints debugging output onto the console, " - "this includes the curl request and response " - "calls. Helpful for debugging and " - "understanding the API calls.") - - parser.add_argument('--os-username', - metavar='<auth-user-name>', - default=env('OS_USERNAME'), - help='Name used for authentication with the ' - 'OpenStack Identity service. ' - 'Defaults to env[OS_USERNAME].') - parser.add_argument('--os_username', - help=argparse.SUPPRESS) - - parser.add_argument('--os-password', - metavar='<auth-password>', - default=env('OS_PASSWORD'), - help='Password used for authentication with the ' - 'OpenStack Identity service. ' - 'Defaults to env[OS_PASSWORD].') - parser.add_argument('--os_password', - help=argparse.SUPPRESS) - - parser.add_argument('--os-tenant-name', - metavar='<auth-tenant-name>', - default=env('OS_TENANT_NAME'), - help='Tenant to request authorization on. ' - 'Defaults to env[OS_TENANT_NAME].') - parser.add_argument('--os_tenant_name', - help=argparse.SUPPRESS) - - parser.add_argument('--os-tenant-id', - metavar='<tenant-id>', - default=env('OS_TENANT_ID'), - help='Tenant to request authorization on. ' - 'Defaults to env[OS_TENANT_ID].') - parser.add_argument('--os_tenant_id', - help=argparse.SUPPRESS) - - parser.add_argument('--os-auth-url', - metavar='<auth-url>', - default=env('OS_AUTH_URL'), - help='Specify the Identity endpoint to use for ' - 'authentication. ' - 'Defaults to env[OS_AUTH_URL].') - parser.add_argument('--os_auth_url', - help=argparse.SUPPRESS) - - parser.add_argument('--os-region-name', - metavar='<region-name>', - default=env('OS_REGION_NAME'), - help='Specify the region to use. ' - 'Defaults to env[OS_REGION_NAME].') - parser.add_argument('--os_region_name', - help=argparse.SUPPRESS) - - parser.add_argument('--os-identity-api-version', - metavar='<identity-api-version>', - default=env('OS_IDENTITY_API_VERSION', - 'KEYSTONE_VERSION'), - help='Specify Identity API version to use. ' - 'Defaults to env[OS_IDENTITY_API_VERSION]' - ' or 2.0.') - parser.add_argument('--os_identity_api_version', - help=argparse.SUPPRESS) - - parser.add_argument('--os-token', - metavar='<service-token>', - default=env('OS_SERVICE_TOKEN'), - help='Specify an existing token to use instead of ' - 'retrieving one via authentication (e.g. ' - 'with username & password). ' - 'Defaults to env[OS_SERVICE_TOKEN].') - - parser.add_argument('--os-endpoint', - metavar='<service-endpoint>', - default=env('OS_SERVICE_ENDPOINT'), - help='Specify an endpoint to use instead of ' - 'retrieving one from the service catalog ' - '(via authentication). ' - 'Defaults to env[OS_SERVICE_ENDPOINT].') - - parser.add_argument('--os-cache', - default=env('OS_CACHE', default=False), - action='store_true', - help='Use the auth token cache. ' - 'Defaults to env[OS_CACHE].') - parser.add_argument('--os_cache', - help=argparse.SUPPRESS) - - parser.add_argument('--force-new-token', - default=False, - action="store_true", - dest='force_new_token', - help="If the keyring is available and in use, " - "token will always be stored and fetched " - "from the keyring until the token has " - "expired. Use this option to request a " - "new token and replace the existing one " - "in the keyring.") - - parser.add_argument('--stale-duration', - metavar='<seconds>', - default=access.STALE_TOKEN_DURATION, - dest='stale_duration', - help="Stale duration (in seconds) used to " - "determine whether a token has expired " - "when retrieving it from keyring. This " - "is useful in mitigating process or " - "network delays. Default is %s seconds." % - access.STALE_TOKEN_DURATION) - - session.Session.register_cli_options(parser) - - parser.add_argument('--os_cacert', help=argparse.SUPPRESS) - parser.add_argument('--os_key', help=argparse.SUPPRESS) - parser.add_argument('--os_cert', help=argparse.SUPPRESS) - - return parser - - def get_subcommand_parser(self, version): - parser = self.get_base_parser() - - self.subcommands = {} - subparsers = parser.add_subparsers(metavar='<subcommand>') - - try: - actions_module = { - '2.0': shell_v2_0, - }[version] - except KeyError: - actions_module = shell_v2_0 - - self._find_actions(subparsers, actions_module) - self._find_actions(subparsers, shell_generic) - self._find_actions(subparsers, shell_bootstrap) - self._find_actions(subparsers, self) - self._add_bash_completion_subparser(subparsers) - - return parser - - def _add_bash_completion_subparser(self, subparsers): - subparser = subparsers.add_parser( - 'bash_completion', - add_help=False, - formatter_class=OpenStackHelpFormatter - ) - self.subcommands['bash_completion'] = subparser - subparser.set_defaults(func=self.do_bash_completion) - - def _find_actions(self, subparsers, actions_module): - for attr in (a for a in dir(actions_module) if a.startswith('do_')): - # I prefer to be hyphen-separated instead of underscores. - command = attr[3:].replace('_', '-') - callback = getattr(actions_module, attr) - desc = callback.__doc__ or '' - help = desc.strip().split('\n')[0] - arguments = getattr(callback, 'arguments', []) - - subparser = subparsers.add_parser( - command, - help=help, - description=desc, - add_help=False, - formatter_class=OpenStackHelpFormatter) - subparser.add_argument('-h', '--help', action='help', - help=argparse.SUPPRESS) - self.subcommands[command] = subparser - group = subparser.add_argument_group(title='Arguments') - for (args, kwargs) in arguments: - group.add_argument(*args, **kwargs) - subparser.set_defaults(func=callback) - - def auth_check(self, args): - if args.os_token or args.os_endpoint: - if not args.os_token: - raise exc.CommandError( - 'Expecting a token provided via either --os-token or ' - 'env[OS_SERVICE_TOKEN]') - - if not args.os_endpoint: - raise exc.CommandError( - 'Expecting an endpoint provided via either ' - '--os-endpoint or env[OS_SERVICE_ENDPOINT]') - - # user supplied a token and endpoint and at least one other cred - if args.os_username or args.os_password or args.os_auth_url: - msg = ('WARNING: Bypassing authentication using a token & ' - 'endpoint (authentication credentials are being ' - 'ignored).') - print(msg) - - else: - if not args.os_auth_url: - raise exc.CommandError( - 'Expecting an auth URL via either --os-auth-url or ' - 'env[OS_AUTH_URL]') - - if args.os_username or args.os_password: - if not args.os_username: - raise exc.CommandError( - 'Expecting a username provided via either ' - '--os-username or env[OS_USERNAME]') - - if not args.os_password: - args.os_password = utils.prompt_user_password() - - # No password because we didn't have a tty or the - # user Ctl-D when prompted? - if not args.os_password: - raise exc.CommandError( - 'Expecting a password provided via either ' - '--os-password, env[OS_PASSWORD], or ' - 'prompted response') - - else: - raise exc.CommandError('Expecting authentication method via' - '\n either a service token, ' - '--os-token or env[OS_SERVICE_TOKEN], ' - '\n credentials, ' - '--os-username or env[OS_USERNAME]') - - def main(self, argv): - # Parse args once to find version - parser = self.get_base_parser() - (options, args) = parser.parse_known_args(argv) - - # build available subcommands based on version - api_version = options.os_identity_api_version - subcommand_parser = self.get_subcommand_parser(api_version) - self.parser = subcommand_parser - - # Handle top-level --help/-h before attempting to parse - # a command off the command line - if not argv or options.help: - self.do_help(options) - return 0 - - # Parse args again and call whatever callback was selected - args = subcommand_parser.parse_args(argv) - - # Short-circuit and deal with help command right away. - if args.func == self.do_help: - self.do_help(args) - return 0 - elif args.func == self.do_bash_completion: - self.do_bash_completion(args) - return 0 - - if args.debug: - logging_level = logging.DEBUG - iso_logger = logging.getLogger('iso8601') - iso_logger.setLevel('WARN') - else: - logging_level = logging.WARNING - - logging.basicConfig(level=logging_level) - - # TODO(heckj): supporting backwards compatibility with environment - # variables. To be removed after DEVSTACK is updated, ideally in - # the Grizzly release cycle. - args.os_token = args.os_token or env('SERVICE_TOKEN') - args.os_endpoint = args.os_endpoint or env('SERVICE_ENDPOINT') - - if utils.isunauthenticated(args.func): - self.cs = shell_generic.CLIENT_CLASS(endpoint=args.os_auth_url, - cacert=args.os_cacert, - key=args.os_key, - cert=args.os_cert, - insecure=args.insecure, - timeout=args.timeout) - else: - self.auth_check(args) - token = None - if args.os_token and args.os_endpoint: - token = args.os_token - api_version = options.os_identity_api_version - self.cs = self.get_api_class(api_version)( - username=args.os_username, - tenant_name=args.os_tenant_name, - tenant_id=args.os_tenant_id, - token=token, - endpoint=args.os_endpoint, - password=args.os_password, - auth_url=args.os_auth_url, - region_name=args.os_region_name, - cacert=args.os_cacert, - key=args.os_key, - cert=args.os_cert, - insecure=args.insecure, - debug=args.debug, - use_keyring=args.os_cache, - force_new_token=args.force_new_token, - stale_duration=args.stale_duration, - timeout=args.timeout) - - try: - args.func(self.cs, args) - except exc.Unauthorized: - raise exc.CommandError("Invalid OpenStack Identity credentials.") - except exc.AuthorizationFailure: - raise exc.CommandError("Unable to authorize user") - - def get_api_class(self, version): - try: - return { - "2.0": shell_v2_0.CLIENT_CLASS, - }[version] - except KeyError: - if version: - msg = ('WARNING: unsupported identity-api-version %s, ' - 'falling back to 2.0' % version) - print(msg) - return shell_v2_0.CLIENT_CLASS - - def do_bash_completion(self, args): - """Prints all of the commands and options to stdout. - - The keystone.bash_completion script doesn't have to hard code them. - """ - commands = set() - options = set() - for sc_str, sc in self.subcommands.items(): - commands.add(sc_str) - for option in list(sc._optionals._option_string_actions): - options.add(option) - - commands.remove('bash-completion') - commands.remove('bash_completion') - print(' '.join(commands | options)) - - @utils.arg('command', metavar='<subcommand>', nargs='?', - help='Display help for <subcommand>.') - def do_help(self, args): - """Display help about this program or one of its subcommands.""" - if getattr(args, 'command', None): - if args.command in self.subcommands: - self.subcommands[args.command].print_help() - else: - raise exc.CommandError("'%s' is not a valid subcommand" % - args.command) - else: - self.parser.print_help() - - -# I'm picky about my shell help. -class OpenStackHelpFormatter(argparse.HelpFormatter): - INDENT_BEFORE_ARGUMENTS = 6 - MAX_WIDTH_ARGUMENTS = 32 - - def add_arguments(self, actions): - for action in filter(lambda x: not x.option_strings, actions): - if not action.choices: - continue - for choice in action.choices: - length = len(choice) + self.INDENT_BEFORE_ARGUMENTS - if(length > self._max_help_position and - length <= self.MAX_WIDTH_ARGUMENTS): - self._max_help_position = length - super(OpenStackHelpFormatter, self).add_arguments(actions) - - def start_section(self, heading): - # Title-case the headings - heading = '%s%s' % (heading[0].upper(), heading[1:]) - super(OpenStackHelpFormatter, self).start_section(heading) - - -def main(): - try: - OpenStackIdentityShell().main(sys.argv[1:]) - except KeyboardInterrupt: - print("... terminating keystone client", file=sys.stderr) - sys.exit(130) - except Exception as e: - print(encodeutils.safe_encode(six.text_type(e)), file=sys.stderr) - sys.exit(1) - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/keystoneclient/tests/functional/test_cli.py b/keystoneclient/tests/functional/test_cli.py deleted file mode 100644 index e3800a2..0000000 --- a/keystoneclient/tests/functional/test_cli.py +++ /dev/null @@ -1,143 +0,0 @@ - -# Copyright 2013 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import re - -from tempest_lib.cli import base -from tempest_lib import exceptions - - -class SimpleReadOnlyKeystoneClientTest(base.ClientTestBase): - """Basic, read-only tests for Keystone CLI client. - - Checks return values and output of read-only commands. - These tests do not presume any content, nor do they create - their own. They only verify the structure of output if present. - """ - - def _get_clients(self): - path = os.path.join(os.path.abspath('.'), '.tox/functional/bin') - cli_dir = os.environ.get('OS_KEYSTONECLIENT_EXEC_DIR', path) - - return base.CLIClient( - username=os.environ.get('OS_USERNAME'), - password=os.environ.get('OS_PASSWORD'), - tenant_name=os.environ.get('OS_TENANT_NAME'), - uri=os.environ.get('OS_AUTH_URL'), - cli_dir=cli_dir) - - def keystone(self, *args, **kwargs): - return self.clients.keystone(*args, **kwargs) - - def test_admin_fake_action(self): - self.assertRaises(exceptions.CommandFailed, - self.keystone, - 'this-does-not-exist') - - def test_admin_catalog_list(self): - out = self.keystone('catalog') - catalog = self.parser.details_multiple(out, with_label=True) - for svc in catalog: - if svc.get('__label'): - self.assertTrue(svc['__label'].startswith('Service:'), - msg=('Invalid beginning of service block: ' - '%s' % svc['__label'])) - # check that region and publicURL exists. One might also - # check for adminURL and internalURL. id seems to be optional - # and is missing in the catalog backend - self.assertIn('publicURL', svc) - self.assertIn('region', svc) - - def test_admin_endpoint_list(self): - out = self.keystone('endpoint-list') - endpoints = self.parser.listing(out) - self.assertTableStruct(endpoints, [ - 'id', 'region', 'publicurl', 'internalurl', - 'adminurl', 'service_id']) - - def test_admin_endpoint_service_match(self): - endpoints = self.parser.listing(self.keystone('endpoint-list')) - services = self.parser.listing(self.keystone('service-list')) - svc_by_id = {} - for svc in services: - svc_by_id[svc['id']] = svc - for endpoint in endpoints: - self.assertIn(endpoint['service_id'], svc_by_id) - - def test_admin_role_list(self): - roles = self.parser.listing(self.keystone('role-list')) - self.assertTableStruct(roles, ['id', 'name']) - - def test_admin_service_list(self): - services = self.parser.listing(self.keystone('service-list')) - self.assertTableStruct(services, ['id', 'name', 'type', 'description']) - - def test_admin_tenant_list(self): - tenants = self.parser.listing(self.keystone('tenant-list')) - self.assertTableStruct(tenants, ['id', 'name', 'enabled']) - - def test_admin_user_list(self): - users = self.parser.listing(self.keystone('user-list')) - self.assertTableStruct(users, [ - 'id', 'name', 'enabled', 'email']) - - def test_admin_user_role_list(self): - user_roles = self.parser.listing(self.keystone('user-role-list')) - self.assertTableStruct(user_roles, [ - 'id', 'name', 'user_id', 'tenant_id']) - - def test_admin_discover(self): - discovered = self.keystone('discover') - self.assertIn('Keystone found at http', discovered) - self.assertIn('supports version', discovered) - - def test_admin_help(self): - help_text = self.keystone('help') - lines = help_text.split('\n') - self.assertFirstLineStartsWith(lines, 'usage: keystone') - - commands = [] - cmds_start = lines.index('Positional arguments:') - cmds_end = lines.index('Optional arguments:') - command_pattern = re.compile('^ {4}([a-z0-9\-\_]+)') - for line in lines[cmds_start:cmds_end]: - match = command_pattern.match(line) - if match: - commands.append(match.group(1)) - commands = set(commands) - wanted_commands = set(('catalog', 'endpoint-list', 'help', - 'token-get', 'discover', 'bootstrap')) - self.assertFalse(wanted_commands - commands) - - def test_admin_bashcompletion(self): - self.keystone('bash-completion') - - def test_admin_ec2_credentials_list(self): - creds = self.keystone('ec2-credentials-list') - creds = self.parser.listing(creds) - self.assertTableStruct(creds, ['tenant', 'access', 'secret']) - - # Optional arguments: - - def test_admin_version(self): - self.keystone('', flags='--version') - - def test_admin_debug_list(self): - self.keystone('catalog', flags='--debug') - - def test_admin_timeout(self): - self.keystone('catalog', flags='--timeout %d' % 15) diff --git a/keystoneclient/tests/unit/generic/test_shell.py b/keystoneclient/tests/unit/generic/test_shell.py deleted file mode 100644 index ad457aa..0000000 --- a/keystoneclient/tests/unit/generic/test_shell.py +++ /dev/null @@ -1,129 +0,0 @@ -# Copyright 2014 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock -from six import moves - -from keystoneclient.generic import shell -from keystoneclient.tests.unit import utils - - -class DoDiscoverTest(utils.TestCase): - """Unit tests for do_discover function.""" - foo_version = { - 'id': 'foo_id', - 'status': 'foo_status', - 'url': 'http://foo/url', - } - bar_version = { - 'id': 'bar_id', - 'status': 'bar_status', - 'url': 'http://bar/url', - } - foo_extension = { - 'foo': 'foo_extension', - 'message': 'extension_message', - 'bar': 'bar_extension', - } - stub_message = 'This is a stub message' - - def setUp(self): - super(DoDiscoverTest, self).setUp() - - self.client_mock = mock.Mock() - self.client_mock.discover.return_value = {} - - def _execute_discover(self): - """Call do_discover function and capture output - - :returns: captured output is returned - """ - with mock.patch('sys.stdout', - new_callable=moves.StringIO) as mock_stdout: - shell.do_discover(self.client_mock, args=None) - output = mock_stdout.getvalue() - return output - - def _check_version_print(self, output, version): - """Checks all api version's parameters are present in output.""" - self.assertIn(version['id'], output) - self.assertIn(version['status'], output) - self.assertIn(version['url'], output) - - def test_no_keystones(self): - # No servers configured for client, - # corresponding message should be printed - output = self._execute_discover() - self.assertIn('No Keystone-compatible endpoint found', output) - - def test_endpoint(self): - # Endpoint is configured for client, - # client's discover method should be called with that value - self.client_mock.endpoint = 'Some non-empty value' - shell.do_discover(self.client_mock, args=None) - self.client_mock.discover.assert_called_with(self.client_mock.endpoint) - - def test_auth_url(self): - # No endpoint provided for client, but there is an auth_url - # client's discover method should be called with auth_url value - self.client_mock.endpoint = False - self.client_mock.auth_url = 'Some non-empty value' - shell.do_discover(self.client_mock, args=None) - self.client_mock.discover.assert_called_with(self.client_mock.auth_url) - - def test_empty(self): - # No endpoint or auth_url is configured for client. - # client.discover() should be called without parameters - self.client_mock.endpoint = False - self.client_mock.auth_url = False - shell.do_discover(self.client_mock, args=None) - self.client_mock.discover.assert_called_with() - - def test_message(self): - # If client.discover() result contains message - it should be printed - self.client_mock.discover.return_value = {'message': self.stub_message} - output = self._execute_discover() - self.assertIn(self.stub_message, output) - - def test_versions(self): - # Every version in client.discover() result should be printed - # and client.discover_extension() should be called on its url - self.client_mock.discover.return_value = { - 'foo': self.foo_version, - 'bar': self.bar_version, - } - self.client_mock.discover_extensions.return_value = {} - output = self._execute_discover() - self._check_version_print(output, self.foo_version) - self._check_version_print(output, self.bar_version) - - discover_extension_calls = [ - mock.call(self.foo_version['url']), - mock.call(self.bar_version['url']), - ] - - self.client_mock.discover_extensions.assert_has_calls( - discover_extension_calls, - any_order=True) - - def test_extensions(self): - # Every extension's parameters should be printed - # Extension's message should be omitted - self.client_mock.discover.return_value = {'foo': self.foo_version} - self.client_mock.discover_extensions.return_value = self.foo_extension - output = self._execute_discover() - self.assertIn(self.foo_extension['foo'], output) - self.assertIn(self.foo_extension['bar'], output) - self.assertNotIn(self.foo_extension['message'], output) diff --git a/keystoneclient/tests/unit/test_shell.py b/keystoneclient/tests/unit/test_shell.py deleted file mode 100644 index 06e77be..0000000 --- a/keystoneclient/tests/unit/test_shell.py +++ /dev/null @@ -1,534 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import argparse -import json -import logging -import os -import sys -import uuid - -import fixtures -import mock -import six -import testtools -from testtools import matchers - -from keystoneclient import exceptions -from keystoneclient import session -from keystoneclient import shell as openstack_shell -from keystoneclient.tests.unit import utils -from keystoneclient.v2_0 import shell as shell_v2_0 - - -DEFAULT_USERNAME = 'username' -DEFAULT_PASSWORD = 'password' -DEFAULT_TENANT_ID = 'tenant_id' -DEFAULT_TENANT_NAME = 'tenant_name' -DEFAULT_AUTH_URL = 'http://127.0.0.1:5000/v2.0/' - - -# Make a fake shell object, a helping wrapper to call it -def shell(cmd): - openstack_shell.OpenStackIdentityShell().main(cmd.split()) - - -class NoExitArgumentParser(argparse.ArgumentParser): - def error(self, message): - raise exceptions.CommandError(message) - - -class ShellTest(utils.TestCase): - - FAKE_ENV = { - 'OS_USERNAME': DEFAULT_USERNAME, - 'OS_PASSWORD': DEFAULT_PASSWORD, - 'OS_TENANT_ID': DEFAULT_TENANT_ID, - 'OS_TENANT_NAME': DEFAULT_TENANT_NAME, - 'OS_AUTH_URL': DEFAULT_AUTH_URL, - } - - def _tolerant_shell(self, cmd): - t_shell = openstack_shell.OpenStackIdentityShell(NoExitArgumentParser) - t_shell.main(cmd.split()) - - # Patch os.environ to avoid required auth info. - def setUp(self): - - super(ShellTest, self).setUp() - for var in os.environ: - if var.startswith("OS_"): - self.useFixture(fixtures.EnvironmentVariable(var, "")) - - for var in self.FAKE_ENV: - self.useFixture(fixtures.EnvironmentVariable(var, - self.FAKE_ENV[var])) - - def test_help_unknown_command(self): - self.assertRaises(exceptions.CommandError, shell, 'help %s' - % uuid.uuid4().hex) - - def shell(self, argstr): - orig = sys.stdout - clean_env = {} - _old_env, os.environ = os.environ, clean_env.copy() - try: - sys.stdout = six.StringIO() - _shell = openstack_shell.OpenStackIdentityShell() - _shell.main(argstr.split()) - except SystemExit: - exc_type, exc_value, exc_traceback = sys.exc_info() - self.assertEqual(exc_value.code, 0) - finally: - out = sys.stdout.getvalue() - sys.stdout.close() - sys.stdout = orig - os.environ = _old_env - return out - - def test_help_no_args(self): - do_tenant_mock = mock.MagicMock() - with mock.patch('keystoneclient.shell.OpenStackIdentityShell.do_help', - do_tenant_mock): - self.shell('') - assert do_tenant_mock.called - - def test_help(self): - required = 'usage:' - help_text = self.shell('help') - self.assertThat(help_text, - matchers.MatchesRegex(required)) - - def test_help_command(self): - required = 'usage: keystone user-create' - help_text = self.shell('help user-create') - self.assertThat(help_text, - matchers.MatchesRegex(required)) - - def test_help_command_with_no_action_choices(self): - required = 'usage: keystone user-update' - help_text = self.shell('help user-update') - self.assertThat(help_text, - matchers.MatchesRegex(required)) - - def test_auth_no_credentials(self): - with testtools.ExpectedException( - exceptions.CommandError, 'Expecting'): - self.shell('user-list') - - def test_debug(self): - logging_mock = mock.MagicMock() - with mock.patch('logging.basicConfig', logging_mock): - self.assertRaises(exceptions.CommandError, - self.shell, '--debug user-list') - self.assertTrue(logging_mock.called) - self.assertEqual([(), {'level': logging.DEBUG}], - list(logging_mock.call_args)) - - def test_auth_password_authurl_no_username(self): - with testtools.ExpectedException( - exceptions.CommandError, - 'Expecting a username provided via either'): - self.shell('--os-password=%s --os-auth-url=%s user-list' - % (uuid.uuid4().hex, uuid.uuid4().hex)) - - def test_auth_username_password_no_authurl(self): - with testtools.ExpectedException( - exceptions.CommandError, 'Expecting an auth URL via either'): - self.shell('--os-password=%s --os-username=%s user-list' - % (uuid.uuid4().hex, uuid.uuid4().hex)) - - def test_token_no_endpoint(self): - with testtools.ExpectedException( - exceptions.CommandError, 'Expecting an endpoint provided'): - self.shell('--os-token=%s user-list' % uuid.uuid4().hex) - - def test_endpoint_no_token(self): - with testtools.ExpectedException( - exceptions.CommandError, 'Expecting a token provided'): - self.shell('--os-endpoint=http://10.0.0.1:5000/v2.0/ user-list') - - def test_shell_args(self): - do_tenant_mock = mock.MagicMock() - with mock.patch('keystoneclient.v2_0.shell.do_user_list', - do_tenant_mock): - shell('user-list') - assert do_tenant_mock.called - ((a, b), c) = do_tenant_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID, - DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - # Old_style options - shell('--os_auth_url http://0.0.0.0:5000/ --os_password xyzpdq ' - '--os_tenant_id 1234 --os_tenant_name fred ' - '--os_username barney ' - '--os_identity_api_version 2.0 user-list') - assert do_tenant_mock.called - ((a, b), c) = do_tenant_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = ('http://0.0.0.0:5000/', 'xyzpdq', '1234', - 'fred', 'barney', '2.0') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - # New-style options - shell('--os-auth-url http://1.1.1.1:5000/ --os-password xyzpdq ' - '--os-tenant-id 4321 --os-tenant-name wilma ' - '--os-username betty ' - '--os-identity-api-version 2.0 user-list') - assert do_tenant_mock.called - ((a, b), c) = do_tenant_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = ('http://1.1.1.1:5000/', 'xyzpdq', '4321', - 'wilma', 'betty', '2.0') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - # Test keyring options - shell('--os-auth-url http://1.1.1.1:5000/ --os-password xyzpdq ' - '--os-tenant-id 4321 --os-tenant-name wilma ' - '--os-username betty ' - '--os-identity-api-version 2.0 ' - '--os-cache ' - '--stale-duration 500 ' - '--force-new-token user-list') - assert do_tenant_mock.called - ((a, b), c) = do_tenant_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version, b.os_cache, - b.stale_duration, b.force_new_token) - expect = ('http://1.1.1.1:5000/', 'xyzpdq', '4321', - 'wilma', 'betty', '2.0', True, '500', True) - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - # Test os-identity-api-version fall back to 2.0 - shell('--os-identity-api-version 3.0 user-list') - assert do_tenant_mock.called - self.assertTrue(b.os_identity_api_version, '2.0') - - def test_shell_user_create_args(self): - """Test user-create args.""" - do_uc_mock = mock.MagicMock() - # grab the decorators for do_user_create - uc_func = getattr(shell_v2_0, 'do_user_create') - do_uc_mock.arguments = getattr(uc_func, 'arguments', []) - with mock.patch('keystoneclient.v2_0.shell.do_user_create', - do_uc_mock): - - # Old_style options - # Test case with one --tenant_id args present: ec2 creds - shell('user-create --name=FOO ' - '--pass=secret --tenant_id=barrr --enabled=true') - assert do_uc_mock.called - ((a, b), c) = do_uc_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID, - DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - actual = (b.tenant_id, b.name, b.passwd, b.enabled) - expect = ('barrr', 'FOO', 'secret', 'true') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - # New-style options - # Test case with one --tenant args present: ec2 creds - shell('user-create --name=foo ' - '--pass=secret --tenant=BARRR --enabled=true') - assert do_uc_mock.called - ((a, b), c) = do_uc_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID, - DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - actual = (b.tenant, b.name, b.passwd, b.enabled) - expect = ('BARRR', 'foo', 'secret', 'true') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - # New-style options - # Test case with one --tenant-id args present: ec2 creds - shell('user-create --name=foo ' - '--pass=secret --tenant-id=BARRR --enabled=true') - assert do_uc_mock.called - ((a, b), c) = do_uc_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID, - DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - actual = (b.tenant, b.name, b.passwd, b.enabled) - expect = ('BARRR', 'foo', 'secret', 'true') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - # Old_style options - # Test case with --os_tenant_id and --tenant_id args present - shell('--os_tenant_id=os-tenant user-create --name=FOO ' - '--pass=secret --tenant_id=barrr --enabled=true') - assert do_uc_mock.called - ((a, b), c) = do_uc_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, 'os-tenant', - DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - actual = (b.tenant_id, b.name, b.passwd, b.enabled) - expect = ('barrr', 'FOO', 'secret', 'true') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - # New-style options - # Test case with --os-tenant-id and --tenant-id args present - shell('--os-tenant-id=ostenant user-create --name=foo ' - '--pass=secret --tenant-id=BARRR --enabled=true') - assert do_uc_mock.called - ((a, b), c) = do_uc_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, 'ostenant', - DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - actual = (b.tenant, b.name, b.passwd, b.enabled) - expect = ('BARRR', 'foo', 'secret', 'true') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - def test_do_tenant_create(self): - do_tenant_mock = mock.MagicMock() - with mock.patch('keystoneclient.v2_0.shell.do_tenant_create', - do_tenant_mock): - shell('tenant-create') - assert do_tenant_mock.called - # FIXME(dtroyer): how do you test the decorators? - # shell('tenant-create --tenant-name wilma ' - # '--description "fred\'s wife"') - # assert do_tenant_mock.called - - def test_do_tenant_list(self): - do_tenant_mock = mock.MagicMock() - with mock.patch('keystoneclient.v2_0.shell.do_tenant_list', - do_tenant_mock): - shell('tenant-list') - assert do_tenant_mock.called - - def test_shell_tenant_id_args(self): - """Test where tenant_id is passed twice. - - Test a corner case where --tenant_id appears on the - command-line twice. - """ - do_ec2_mock = mock.MagicMock() - # grab the decorators for do_ec2_create_credentials - ec2_func = getattr(shell_v2_0, 'do_ec2_credentials_create') - do_ec2_mock.arguments = getattr(ec2_func, 'arguments', []) - with mock.patch('keystoneclient.v2_0.shell.do_ec2_credentials_create', - do_ec2_mock): - - # Old_style options - # Test case with one --tenant_id args present: ec2 creds - shell('ec2-credentials-create ' - '--tenant_id=ec2-tenant --user_id=ec2-user') - assert do_ec2_mock.called - ((a, b), c) = do_ec2_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID, - DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - actual = (b.tenant_id, b.user_id) - expect = ('ec2-tenant', 'ec2-user') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - # New-style options - # Test case with one --tenant-id args present: ec2 creds - shell('ec2-credentials-create ' - '--tenant-id=dash-tenant --user-id=dash-user') - assert do_ec2_mock.called - ((a, b), c) = do_ec2_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID, - DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - actual = (b.tenant_id, b.user_id) - expect = ('dash-tenant', 'dash-user') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - # Old_style options - # Test case with two --tenant_id args present - shell('--os_tenant_id=os-tenant ec2-credentials-create ' - '--tenant_id=ec2-tenant --user_id=ec2-user') - assert do_ec2_mock.called - ((a, b), c) = do_ec2_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, 'os-tenant', - DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - actual = (b.tenant_id, b.user_id) - expect = ('ec2-tenant', 'ec2-user') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - # New-style options - # Test case with two --tenant-id args present - shell('--os-tenant-id=ostenant ec2-credentials-create ' - '--tenant-id=dash-tenant --user-id=dash-user') - assert do_ec2_mock.called - ((a, b), c) = do_ec2_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, 'ostenant', - DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - actual = (b.tenant_id, b.user_id) - expect = ('dash-tenant', 'dash-user') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - def test_do_ec2_get(self): - do_shell_mock = mock.MagicMock() - - with mock.patch('keystoneclient.v2_0.shell.do_ec2_credentials_create', - do_shell_mock): - shell('ec2-credentials-create') - assert do_shell_mock.called - - with mock.patch('keystoneclient.v2_0.shell.do_ec2_credentials_get', - do_shell_mock): - shell('ec2-credentials-get') - assert do_shell_mock.called - - with mock.patch('keystoneclient.v2_0.shell.do_ec2_credentials_list', - do_shell_mock): - shell('ec2-credentials-list') - assert do_shell_mock.called - - with mock.patch('keystoneclient.v2_0.shell.do_ec2_credentials_delete', - do_shell_mock): - shell('ec2-credentials-delete') - assert do_shell_mock.called - - def test_timeout_parse_invalid_type(self): - for f in ['foobar', 'xyz']: - cmd = '--timeout %s endpoint-create' % (f) - self.assertRaises(exceptions.CommandError, - self._tolerant_shell, cmd) - - def test_timeout_parse_invalid_number(self): - for f in [-1, 0]: - cmd = '--timeout %s endpoint-create' % (f) - self.assertRaises(exceptions.CommandError, - self._tolerant_shell, cmd) - - def test_do_timeout(self): - response_mock = mock.MagicMock() - response_mock.status_code = 200 - response_mock.text = json.dumps({ - 'endpoints': [], - }) - request_mock = mock.MagicMock(return_value=response_mock) - with mock.patch.object(session.requests, 'request', - request_mock): - shell(('--timeout 2 --os-token=blah --os-endpoint=blah' - ' --os-auth-url=blah.com endpoint-list')) - request_mock.assert_called_with(mock.ANY, mock.ANY, - timeout=2, - allow_redirects=False, - headers=mock.ANY, - verify=mock.ANY) - - def test_do_endpoints(self): - do_shell_mock = mock.MagicMock() - # grab the decorators for do_endpoint_create - shell_func = getattr(shell_v2_0, 'do_endpoint_create') - do_shell_mock.arguments = getattr(shell_func, 'arguments', []) - with mock.patch('keystoneclient.v2_0.shell.do_endpoint_create', - do_shell_mock): - - # Old_style options - # Test create args - shell('endpoint-create ' - '--service_id=2 --publicurl=http://example.com:1234/go ' - '--adminurl=http://example.com:9876/adm') - assert do_shell_mock.called - ((a, b), c) = do_shell_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID, - DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - actual = (b.service, b.publicurl, b.adminurl) - expect = ('2', - 'http://example.com:1234/go', - 'http://example.com:9876/adm') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - # New-style options - # Test create args - shell('endpoint-create ' - '--service-id=3 --publicurl=http://example.com:4321/go ' - '--adminurl=http://example.com:9876/adm') - assert do_shell_mock.called - ((a, b), c) = do_shell_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID, - DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - actual = (b.service, b.publicurl, b.adminurl) - expect = ('3', - 'http://example.com:4321/go', - 'http://example.com:9876/adm') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - # New-style options - # Test create args - shell('endpoint-create ' - '--service=3 --publicurl=http://example.com:4321/go ' - '--adminurl=http://example.com:9876/adm') - assert do_shell_mock.called - ((a, b), c) = do_shell_mock.call_args - actual = (b.os_auth_url, b.os_password, b.os_tenant_id, - b.os_tenant_name, b.os_username, - b.os_identity_api_version) - expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID, - DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - actual = (b.service, b.publicurl, b.adminurl) - expect = ('3', - 'http://example.com:4321/go', - 'http://example.com:9876/adm') - self.assertTrue(all([x == y for x, y in zip(actual, expect)])) - - def test_shell_keyboard_interrupt(self): - shell_mock = mock.MagicMock() - with mock.patch('keystoneclient.shell.OpenStackIdentityShell.main', - shell_mock): - try: - shell_mock.side_effect = KeyboardInterrupt() - openstack_shell.main() - except SystemExit as ex: - self.assertEqual(130, ex.code) diff --git a/keystoneclient/tests/unit/test_utils.py b/keystoneclient/tests/unit/test_utils.py index 9ae0ceb..da51e7f 100644 --- a/keystoneclient/tests/unit/test_utils.py +++ b/keystoneclient/tests/unit/test_utils.py @@ -10,8 +10,6 @@ # License for the specific language governing permissions and limitations # under the License. -import sys - import six import testresources from testtools import matchers @@ -103,39 +101,6 @@ class FakeObject(object): self.name = name -class PrintTestCase(test_utils.TestCase): - def setUp(self): - super(PrintTestCase, self).setUp() - self.old_stdout = sys.stdout - self.stdout = six.moves.cStringIO() - self.addCleanup(setattr, self, 'stdout', None) - sys.stdout = self.stdout - self.addCleanup(setattr, sys, 'stdout', self.old_stdout) - - def test_print_list_unicode(self): - name = six.u('\u540d\u5b57') - objs = [FakeObject(name)] - # NOTE(Jeffrey4l) If the text's encode is proper, this method will not - # raise UnicodeEncodeError exceptions - utils.print_list(objs, ['name']) - output = self.stdout.getvalue() - # In Python 2, output will be bytes, while in Python 3, it will not. - # Let's decode the value if needed. - if isinstance(output, six.binary_type): - output = output.decode('utf-8') - self.assertIn(name, output) - - def test_print_dict_unicode(self): - name = six.u('\u540d\u5b57') - utils.print_dict({'name': name}) - output = self.stdout.getvalue() - # In Python 2, output will be bytes, while in Python 3, it will not. - # Let's decode the value if needed. - if isinstance(output, six.binary_type): - output = output.decode('utf-8') - self.assertIn(name, output) - - class HashSignedTokenTestCase(test_utils.TestCase, testresources.ResourcedTestCase): """Unit tests for utils.hash_signed_token().""" diff --git a/keystoneclient/tests/unit/v2_0/test_shell.py b/keystoneclient/tests/unit/v2_0/test_shell.py deleted file mode 100644 index 6210cfb..0000000 --- a/keystoneclient/tests/unit/v2_0/test_shell.py +++ /dev/null @@ -1,460 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import sys - -import mock -from oslo_serialization import jsonutils -import six -from testtools import matchers - -from keystoneclient import fixture -from keystoneclient.tests.unit.v2_0 import utils - - -DEFAULT_USERNAME = 'username' -DEFAULT_PASSWORD = 'password' -DEFAULT_TENANT_ID = 'tenant_id' -DEFAULT_TENANT_NAME = 'tenant_name' -DEFAULT_AUTH_URL = 'http://127.0.0.1:5000/v2.0/' -DEFAULT_ADMIN_URL = 'http://127.0.0.1:35357/v2.0/' - - -class ShellTests(utils.TestCase): - - TEST_URL = DEFAULT_ADMIN_URL - - def setUp(self): - """Patch os.environ to avoid required auth info.""" - - super(ShellTests, self).setUp() - - self.addCleanup(setattr, os, 'environ', os.environ.copy()) - os.environ = { - 'OS_USERNAME': DEFAULT_USERNAME, - 'OS_PASSWORD': DEFAULT_PASSWORD, - 'OS_TENANT_ID': DEFAULT_TENANT_ID, - 'OS_TENANT_NAME': DEFAULT_TENANT_NAME, - 'OS_AUTH_URL': DEFAULT_AUTH_URL, - } - import keystoneclient.shell - self.shell = keystoneclient.shell.OpenStackIdentityShell() - - self.token = fixture.V2Token() - self.token.set_scope() - svc = self.token.add_service('identity') - svc.add_endpoint(public=DEFAULT_AUTH_URL, - admin=DEFAULT_ADMIN_URL) - - self.stub_auth(json=self.token, base_url=DEFAULT_AUTH_URL) - - def run_command(self, cmd): - orig = sys.stdout - try: - sys.stdout = six.StringIO() - if isinstance(cmd, list): - self.shell.main(cmd) - else: - self.shell.main(cmd.split()) - except SystemExit: - exc_type, exc_value, exc_traceback = sys.exc_info() - self.assertEqual(exc_value.code, 0) - finally: - out = sys.stdout.getvalue() - sys.stdout.close() - sys.stdout = orig - return out - - def assert_called(self, method, path, base_url=TEST_URL): - self.assertEqual(method, self.requests_mock.last_request.method) - self.assertEqual(base_url + path.lstrip('/'), - self.requests_mock.last_request.url) - - def test_user_list(self): - self.stub_url('GET', ['users'], json={'users': []}) - self.run_command('user-list') - self.assert_called('GET', '/users') - - def test_user_create(self): - self.stub_url('POST', ['users'], json={'user': {}}) - self.run_command('user-create --name new-user') - - self.assert_called('POST', '/users') - self.assertRequestBodyIs(json={'user': {'email': None, - 'password': None, - 'enabled': True, - 'name': 'new-user', - 'tenantId': None}}) - - @mock.patch('sys.stdin', autospec=True) - def test_user_create_password_prompt(self, mock_stdin): - self.stub_url('POST', ['users'], json={'user': {}}) - - with mock.patch('getpass.getpass') as mock_getpass: - del(os.environ['OS_PASSWORD']) - mock_stdin.isatty = lambda: True - mock_getpass.return_value = 'newpass' - self.run_command('user-create --name new-user --pass') - - self.assert_called('POST', '/users') - self.assertRequestBodyIs(json={'user': {'email': None, - 'password': 'newpass', - 'enabled': True, - 'name': 'new-user', - 'tenantId': None}}) - - def test_user_get(self): - self.stub_url('GET', ['users', '1'], - json={'user': {'id': '1'}}) - self.run_command('user-get 1') - self.assert_called('GET', '/users/1') - - def test_user_delete(self): - self.stub_url('GET', ['users', '1'], - json={'user': {'id': '1'}}) - self.stub_url('DELETE', ['users', '1']) - self.run_command('user-delete 1') - self.assert_called('DELETE', '/users/1') - - def test_user_password_update(self): - self.stub_url('GET', ['users', '1'], - json={'user': {'id': '1'}}) - self.stub_url('PUT', ['users', '1', 'OS-KSADM', 'password']) - self.run_command('user-password-update --pass newpass 1') - self.assert_called('PUT', '/users/1/OS-KSADM/password') - - def test_user_update(self): - self.stub_url('PUT', ['users', '1']) - self.stub_url('GET', ['users', '1'], - json={"user": {"tenantId": "1", - "enabled": "true", - "id": "1", - "name": "username"}}) - - self.run_command('user-update --name new-user1' - ' --email user@email.com --enabled true 1') - self.assert_called('PUT', '/users/1') - body = {'user': {'id': '1', 'email': 'user@email.com', - 'enabled': True, 'name': 'new-user1'}} - self.assertRequestBodyIs(json=body) - - required = 'User not updated, no arguments present.' - out = self.run_command('user-update 1') - self.assertThat(out, matchers.MatchesRegex(required)) - - self.run_command(['user-update', '--email', '', '1']) - self.assert_called('PUT', '/users/1') - self.assertRequestBodyIs(json={'user': {'id': '1', 'email': ''}}) - - def test_role_create(self): - self.stub_url('POST', ['OS-KSADM', 'roles'], json={'role': {}}) - self.run_command('role-create --name new-role') - self.assert_called('POST', '/OS-KSADM/roles') - self.assertRequestBodyIs(json={"role": {"name": "new-role"}}) - - def test_role_get(self): - self.stub_url('GET', ['OS-KSADM', 'roles', '1'], - json={'role': {'id': '1'}}) - self.run_command('role-get 1') - self.assert_called('GET', '/OS-KSADM/roles/1') - - def test_role_list(self): - self.stub_url('GET', ['OS-KSADM', 'roles'], json={'roles': []}) - self.run_command('role-list') - self.assert_called('GET', '/OS-KSADM/roles') - - def test_role_delete(self): - self.stub_url('GET', ['OS-KSADM', 'roles', '1'], - json={'role': {'id': '1'}}) - self.stub_url('DELETE', ['OS-KSADM', 'roles', '1']) - self.run_command('role-delete 1') - self.assert_called('DELETE', '/OS-KSADM/roles/1') - - def test_user_role_add(self): - self.stub_url('GET', ['users', '1'], - json={'user': {'id': '1'}}) - self.stub_url('GET', ['OS-KSADM', 'roles', '1'], - json={'role': {'id': '1'}}) - - self.stub_url('PUT', ['users', '1', 'roles', 'OS-KSADM', '1']) - self.run_command('user-role-add --user_id 1 --role_id 1') - self.assert_called('PUT', '/users/1/roles/OS-KSADM/1') - - def test_user_role_list(self): - self.stub_url('GET', ['tenants', self.token.tenant_id], - json={'tenant': {'id': self.token.tenant_id}}) - self.stub_url('GET', ['tenants', self.token.tenant_id, - 'users', self.token.user_id, 'roles'], - json={'roles': []}) - - url = '/tenants/%s/users/%s/roles' % (self.token.tenant_id, - self.token.user_id) - - self.run_command('user-role-list --user_id %s --tenant-id %s' % - (self.token.user_id, self.token.tenant_id)) - self.assert_called('GET', url) - - self.run_command('user-role-list --user_id %s' % self.token.user_id) - self.assert_called('GET', url) - - self.run_command('user-role-list') - self.assert_called('GET', url) - - def test_user_role_remove(self): - self.stub_url('GET', ['users', '1'], - json={'user': {'id': 1}}) - self.stub_url('GET', ['OS-KSADM', 'roles', '1'], - json={'role': {'id': 1}}) - self.stub_url('DELETE', - ['users', '1', 'roles', 'OS-KSADM', '1']) - - self.run_command('user-role-remove --user_id 1 --role_id 1') - self.assert_called('DELETE', '/users/1/roles/OS-KSADM/1') - - def test_tenant_create(self): - self.stub_url('POST', ['tenants'], json={'tenant': {}}) - self.run_command('tenant-create --name new-tenant') - self.assertRequestBodyIs(json={"tenant": {"enabled": True, - "name": "new-tenant", - "description": None}}) - - def test_tenant_get(self): - self.stub_url('GET', ['tenants', '2'], json={'tenant': {}}) - self.run_command('tenant-get 2') - self.assert_called('GET', '/tenants/2') - - def test_tenant_list(self): - self.stub_url('GET', ['tenants'], json={'tenants': []}) - self.run_command('tenant-list') - self.assert_called('GET', '/tenants') - - def test_tenant_update(self): - self.stub_url('GET', ['tenants', '1'], - json={'tenant': {'id': '1'}}) - self.stub_url('GET', ['tenants', '2'], - json={'tenant': {'id': '2'}}) - self.stub_url('POST', ['tenants', '2'], - json={'tenant': {'id': '2'}}) - self.run_command('tenant-update' - ' --name new-tenant1 --enabled false' - ' --description desc 2') - self.assert_called('POST', '/tenants/2') - self.assertRequestBodyIs(json={"tenant": {"enabled": False, - "id": "2", - "description": "desc", - "name": "new-tenant1"}}) - - required = 'Tenant not updated, no arguments present.' - out = self.run_command('tenant-update 1') - self.assertThat(out, matchers.MatchesRegex(required)) - - def test_tenant_delete(self): - self.stub_url('GET', ['tenants', '2'], - json={'tenant': {'id': '2'}}) - self.stub_url('DELETE', ['tenants', '2']) - self.run_command('tenant-delete 2') - self.assert_called('DELETE', '/tenants/2') - - def test_service_create_with_required_arguments_only(self): - self.stub_url('POST', ['OS-KSADM', 'services'], - json={'OS-KSADM:service': {}}) - self.run_command('service-create --type compute') - self.assert_called('POST', '/OS-KSADM/services') - json = {"OS-KSADM:service": {"type": "compute", - "name": None, - "description": None}} - self.assertRequestBodyIs(json=json) - - def test_service_create_with_all_arguments(self): - self.stub_url('POST', ['OS-KSADM', 'services'], - json={'OS-KSADM:service': {}}) - self.run_command('service-create --type compute ' - '--name service1 --description desc1') - self.assert_called('POST', '/OS-KSADM/services') - json = {"OS-KSADM:service": {"type": "compute", - "name": "service1", - "description": "desc1"}} - self.assertRequestBodyIs(json=json) - - def test_service_get(self): - self.stub_url('GET', ['OS-KSADM', 'services', '1'], - json={'OS-KSADM:service': {'id': '1'}}) - self.run_command('service-get 1') - self.assert_called('GET', '/OS-KSADM/services/1') - - def test_service_list(self): - self.stub_url('GET', ['OS-KSADM', 'services'], - json={'OS-KSADM:services': []}) - self.run_command('service-list') - self.assert_called('GET', '/OS-KSADM/services') - - def test_service_delete(self): - self.stub_url('GET', ['OS-KSADM', 'services', '1'], - json={'OS-KSADM:service': {'id': 1}}) - self.stub_url('DELETE', ['OS-KSADM', 'services', '1']) - self.run_command('service-delete 1') - self.assert_called('DELETE', '/OS-KSADM/services/1') - - def test_catalog(self): - self.run_command('catalog') - self.run_command('catalog --service compute') - - def test_ec2_credentials_create(self): - self.stub_url('POST', - ['users', self.token.user_id, 'credentials', 'OS-EC2'], - json={'credential': {}}) - - url = '/users/%s/credentials/OS-EC2' % self.token.user_id - self.run_command('ec2-credentials-create --tenant-id 1 ' - '--user-id %s' % self.token.user_id) - self.assert_called('POST', url) - self.assertRequestBodyIs(json={'tenant_id': '1'}) - - self.run_command('ec2-credentials-create --tenant-id 1') - self.assert_called('POST', url) - self.assertRequestBodyIs(json={'tenant_id': '1'}) - - self.run_command('ec2-credentials-create') - self.assert_called('POST', url) - self.assertRequestBodyIs(json={'tenant_id': self.token.tenant_id}) - - def test_ec2_credentials_delete(self): - self.stub_url('DELETE', - ['users', self.token.user_id, - 'credentials', 'OS-EC2', '2']) - self.run_command('ec2-credentials-delete --access 2 --user-id %s' % - self.token.user_id) - - url = '/users/%s/credentials/OS-EC2/2' % self.token.user_id - self.assert_called('DELETE', url) - - self.run_command('ec2-credentials-delete --access 2') - self.assert_called('DELETE', url) - - def test_ec2_credentials_list(self): - self.stub_url('GET', - ['users', self.token.user_id, 'credentials', 'OS-EC2'], - json={'credentials': []}) - self.run_command('ec2-credentials-list --user-id %s' - % self.token.user_id) - - url = '/users/%s/credentials/OS-EC2' % self.token.user_id - self.assert_called('GET', url) - - self.run_command('ec2-credentials-list') - self.assert_called('GET', url) - - def test_ec2_credentials_get(self): - self.stub_url('GET', - ['users', '1', 'credentials', 'OS-EC2', '2'], - json={'credential': {}}) - self.run_command('ec2-credentials-get --access 2 --user-id 1') - self.assert_called('GET', '/users/1/credentials/OS-EC2/2') - - def test_bootstrap(self): - user = {'user': {'id': '1'}} - role = {'role': {'id': '1'}} - tenant = {'tenant': {'id': '1'}} - - token = fixture.V2Token(user_id=1, tenant_id=1) - token.add_role(id=1) - svc = token.add_service('identity') - svc.add_endpoint(public=DEFAULT_AUTH_URL, - admin=DEFAULT_ADMIN_URL) - - self.stub_auth(json=token) - - self.stub_url('POST', ['OS-KSADM', 'roles'], json=role) - self.stub_url('GET', ['OS-KSADM', 'roles', '1'], json=role) - self.stub_url('POST', ['tenants'], json=tenant) - self.stub_url('GET', ['tenants', '1'], json=tenant) - self.stub_url('POST', ['users'], json=user) - self.stub_url('GET', ['users', '1'], json=user) - self.stub_url('PUT', - ['tenants', '1', 'users', '1', 'roles', 'OS-KSADM', '1'], - json=role) - - self.run_command('bootstrap --user-name new-user' - ' --pass 1 --role-name admin' - ' --tenant-name new-tenant') - - def called_anytime(method, path, json=None): - test_url = self.TEST_URL.strip('/') - for r in self.requests_mock.request_history: - if not r.method == method: - continue - if not r.url == test_url + path: - continue - - if json: - json_body = jsonutils.loads(r.body) - if not json_body == json: - continue - - return True - - raise AssertionError('URL never called') - - called_anytime('POST', '/users', {'user': {'email': None, - 'password': '1', - 'enabled': True, - 'name': 'new-user', - 'tenantId': None}}) - - called_anytime('POST', '/tenants', {"tenant": {"enabled": True, - "name": "new-tenant", - "description": None}}) - - called_anytime('POST', '/OS-KSADM/roles', - {"role": {"name": "admin"}}) - - called_anytime('PUT', '/tenants/1/users/1/roles/OS-KSADM/1') - - def test_bash_completion(self): - self.run_command('bash-completion') - - def test_help(self): - out = self.run_command('help') - required = 'usage: keystone' - self.assertThat(out, matchers.MatchesRegex(required)) - - def test_password_update(self): - self.stub_url('PATCH', - ['OS-KSCRUD', 'users', self.token.user_id], - base_url=DEFAULT_AUTH_URL) - self.run_command('password-update --current-password oldpass' - ' --new-password newpass') - self.assert_called('PATCH', - '/OS-KSCRUD/users/%s' % self.token.user_id, - base_url=DEFAULT_AUTH_URL) - self.assertRequestBodyIs(json={'user': {'original_password': 'oldpass', - 'password': 'newpass'}}) - - def test_endpoint_create(self): - self.stub_url('GET', ['OS-KSADM', 'services', '1'], - json={'OS-KSADM:service': {'id': '1'}}) - self.stub_url('POST', ['endpoints'], json={'endpoint': {}}) - self.run_command('endpoint-create --service-id 1 ' - '--publicurl=http://example.com:1234/go') - self.assert_called('POST', '/endpoints') - json = {'endpoint': {'adminurl': None, - 'service_id': '1', - 'region': 'regionOne', - 'internalurl': None, - 'publicurl': "http://example.com:1234/go"}} - self.assertRequestBodyIs(json=json) - - def test_endpoint_list(self): - self.stub_url('GET', ['endpoints'], json={'endpoints': []}) - self.run_command('endpoint-list') - self.assert_called('GET', '/endpoints') diff --git a/keystoneclient/utils.py b/keystoneclient/utils.py index 7f36d85..7d3fcef 100644 --- a/keystoneclient/utils.py +++ b/keystoneclient/utils.py @@ -15,12 +15,10 @@ import hashlib import logging import sys -from oslo_utils import encodeutils from oslo_utils import timeutils # NOTE(stevemar): do not remove positional. We need this to stay for a while # since versions of auth_token require it here. from positional import positional # noqa -import prettytable import six from keystoneclient import exceptions @@ -29,73 +27,6 @@ from keystoneclient import exceptions logger = logging.getLogger(__name__) -# Decorator for cli-args -def arg(*args, **kwargs): - def _decorator(func): - # Because of the semantics of decorator composition if we just append - # to the options list positional options will appear to be backwards. - func.__dict__.setdefault('arguments', []).insert(0, (args, kwargs)) - return func - return _decorator - - -def pretty_choice_list(l): - return ', '.join("'%s'" % i for i in l) - - -def print_list(objs, fields, formatters={}, order_by=None): - pt = prettytable.PrettyTable([f for f in fields], - caching=False, print_empty=False) - pt.aligns = ['l' for f in fields] - - for o in objs: - row = [] - for field in fields: - if field in formatters: - row.append(formatters[field](o)) - else: - field_name = field.lower().replace(' ', '_') - data = getattr(o, field_name, '') - if data is None: - data = '' - row.append(data) - pt.add_row(row) - - if order_by is None: - order_by = fields[0] - encoded = encodeutils.safe_encode(pt.get_string(sortby=order_by)) - if six.PY3: - encoded = encoded.decode() - print(encoded) - - -def _word_wrap(string, max_length=0): - """wrap long strings to be no longer than max_length.""" - if max_length <= 0: - return string - return '\n'.join([string[i:i + max_length] for i in - range(0, len(string), max_length)]) - - -def print_dict(d, wrap=0): - """pretty table prints dictionaries. - - Wrap values to max_length wrap if wrap>0 - """ - pt = prettytable.PrettyTable(['Property', 'Value'], - caching=False, print_empty=False) - pt.aligns = ['l', 'l'] - for (prop, value) in six.iteritems(d): - if value is None: - value = '' - value = _word_wrap(value, max_length=wrap) - pt.add_row([prop, value]) - encoded = encodeutils.safe_encode(pt.get_string(sortby='Property')) - if six.PY3: - encoded = encoded.decode() - print(encoded) - - def find_resource(manager, name_or_id): """Helper for the _find_* methods.""" diff --git a/keystoneclient/v2_0/shell.py b/keystoneclient/v2_0/shell.py deleted file mode 100755 index b2cb68c..0000000 --- a/keystoneclient/v2_0/shell.py +++ /dev/null @@ -1,547 +0,0 @@ -# Copyright 2010 Jacob Kaplan-Moss -# Copyright 2011 OpenStack Foundation -# Copyright 2011 Nebula, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -This module is deprecated as of the 1.7.0 release in favor of -python-openstackclient and may be removed in the 2.0.0 release. - -Bug fixes are welcome, but new features should be exposed to the CLI by -python-openstackclient after being added to the python-keystoneclient library. - -""" - -import argparse -import getpass -import sys - -from oslo_utils import strutils -import six - -from keystoneclient.i18n import _ -from keystoneclient import utils -from keystoneclient.v2_0 import client - - -CLIENT_CLASS = client.Client -ASK_FOR_PASSWORD = object() - - -def require_service_catalog(f): - msg = _('Configuration error: Client configured to run without a service ' - 'catalog. Run the client using --os-auth-url or OS_AUTH_URL, ' - 'instead of --os-endpoint or OS_SERVICE_ENDPOINT, for example.') - - def wrapped(kc, args): - if not kc.has_service_catalog(): - raise Exception(msg) - return f(kc, args) - - # Change __doc__ attribute back to origin function's __doc__ - wrapped.__doc__ = f.__doc__ - - return wrapped - - -@utils.arg('--tenant', '--tenant-id', metavar='<tenant>', - help='Tenant; lists all users if not specified.') -@utils.arg('--tenant_id', help=argparse.SUPPRESS) -def do_user_list(kc, args): - """List users.""" - if args.tenant: - tenant_id = utils.find_resource(kc.tenants, args.tenant).id - else: - tenant_id = None - users = kc.users.list(tenant_id=tenant_id) - utils.print_list(users, ['id', 'name', 'enabled', 'email'], - order_by='name') - - -@utils.arg('user', metavar='<user>', help='Name or ID of user to display.') -def do_user_get(kc, args): - """Display user details.""" - user = utils.find_resource(kc.users, args.user) - utils.print_dict(user._info) - - -@utils.arg('--name', metavar='<user-name>', required=True, - help='New user name (must be unique).') -@utils.arg('--tenant', '--tenant-id', metavar='<tenant>', - help='New user default tenant.') -@utils.arg('--tenant_id', help=argparse.SUPPRESS) -@utils.arg('--pass', metavar='<pass>', dest='passwd', nargs='?', - const=ASK_FOR_PASSWORD, help='New user password; ' - 'required for some auth backends.') -@utils.arg('--email', metavar='<email>', - help='New user email address.') -@utils.arg('--enabled', metavar='<true|false>', default=True, - help='Initial user enabled status. Default is true.') -def do_user_create(kc, args): - """Create new user.""" - if args.tenant: - tenant_id = utils.find_resource(kc.tenants, args.tenant).id - elif args.tenant_id: - tenant_id = args.tenant_id - else: - tenant_id = None - new_passwd = args.passwd - if args.passwd is ASK_FOR_PASSWORD: - new_passwd = utils.prompt_for_password() - user = kc.users.create(args.name, new_passwd, args.email, - tenant_id=tenant_id, - enabled=strutils.bool_from_string(args.enabled)) - utils.print_dict(user._info) - - -@utils.arg('--name', metavar='<user-name>', - help='Desired new user name.') -@utils.arg('--email', metavar='<email>', - help='Desired new email address.') -@utils.arg('--enabled', metavar='<true|false>', - help='Enable or disable user.') -@utils.arg('user', metavar='<user>', help='Name or ID of user to update.') -def do_user_update(kc, args): - """Update user's name, email, and enabled status.""" - kwargs = {} - if args.name: - kwargs['name'] = args.name - if args.email is not None: - kwargs['email'] = args.email - if args.enabled: - kwargs['enabled'] = strutils.bool_from_string(args.enabled) - - if not len(kwargs): - print(_("User not updated, no arguments present.")) - return - - user = utils.find_resource(kc.users, args.user) - try: - kc.users.update(user, **kwargs) - print(_('User has been updated.')) - except Exception as e: - print(_('Unable to update user: %s') % e) - - -@utils.arg('--pass', metavar='<password>', dest='passwd', required=False, - help='Desired new password.') -@utils.arg('user', metavar='<user>', - help='Name or ID of user to update password.') -def do_user_password_update(kc, args): - """Update user password.""" - user = utils.find_resource(kc.users, args.user) - new_passwd = args.passwd or utils.prompt_for_password() - if new_passwd is None: - msg = (_("\nPlease specify password using the --pass option " - "or using the prompt")) - sys.exit(msg) - kc.users.update_password(user, new_passwd) - - -@utils.arg('--current-password', metavar='<current-password>', - dest='currentpasswd', required=False, help='Current password, ' - 'Defaults to the password as set by --os-password or ' - 'env[OS_PASSWORD].') -@utils.arg('--new-password ', metavar='<new-password>', dest='newpasswd', - required=False, help='Desired new password.') -def do_password_update(kc, args): - """Update own password.""" - - # we are prompting for these passwords if they are not passed in - # this gives users the option not to have their password - # appear in bash history etc.. - currentpasswd = args.os_password - if args.currentpasswd is not None: - currentpasswd = args.currentpasswd - if currentpasswd is None: - currentpasswd = getpass.getpass(_('Current Password: ')) - - newpasswd = args.newpasswd - while newpasswd is None: - passwd1 = getpass.getpass(_('New Password: ')) - passwd2 = getpass.getpass(_('Repeat New Password: ')) - if passwd1 == passwd2: - newpasswd = passwd1 - - kc.users.update_own_password(currentpasswd, newpasswd) - - if args.os_password != newpasswd: - print(_("You should update the password you are using to authenticate " - "to match your new password")) - - -@utils.arg('user', metavar='<user>', help='Name or ID of user to delete.') -def do_user_delete(kc, args): - """Delete user.""" - user = utils.find_resource(kc.users, args.user) - kc.users.delete(user) - - -def do_tenant_list(kc, args): - """List all tenants.""" - tenants = kc.tenants.list() - utils.print_list(tenants, ['id', 'name', 'enabled'], order_by='name') - - -@utils.arg('tenant', metavar='<tenant>', - help='Name or ID of tenant to display.') -def do_tenant_get(kc, args): - """Display tenant details.""" - tenant = utils.find_resource(kc.tenants, args.tenant) - utils.print_dict(tenant._info) - - -@utils.arg('--name', metavar='<tenant-name>', required=True, - help='New tenant name (must be unique).') -@utils.arg('--description', metavar='<tenant-description>', default=None, - help='Description of new tenant. Default is none.') -@utils.arg('--enabled', metavar='<true|false>', default=True, - help='Initial tenant enabled status. Default is true.') -def do_tenant_create(kc, args): - """Create new tenant.""" - tenant = kc.tenants.create(args.name, - description=args.description, - enabled=strutils.bool_from_string(args.enabled)) - utils.print_dict(tenant._info) - - -@utils.arg('--name', metavar='<tenant_name>', - help='Desired new name of tenant.') -@utils.arg('--description', metavar='<tenant-description>', default=None, - help='Desired new description of tenant.') -@utils.arg('--enabled', metavar='<true|false>', - help='Enable or disable tenant.') -@utils.arg('tenant', metavar='<tenant>', - help='Name or ID of tenant to update.') -def do_tenant_update(kc, args): - """Update tenant name, description, enabled status.""" - tenant = utils.find_resource(kc.tenants, args.tenant) - kwargs = {} - if args.name: - kwargs.update({'name': args.name}) - if args.description is not None: - kwargs.update({'description': args.description}) - if args.enabled: - kwargs.update({'enabled': strutils.bool_from_string(args.enabled)}) - - if kwargs == {}: - print(_("Tenant not updated, no arguments present.")) - return - tenant.update(**kwargs) - - -@utils.arg('tenant', metavar='<tenant>', - help='Name or ID of tenant to delete.') -def do_tenant_delete(kc, args): - """Delete tenant.""" - tenant = utils.find_resource(kc.tenants, args.tenant) - kc.tenants.delete(tenant) - - -@utils.arg('--type', metavar='<type>', required=True, - help='Service type (one of: identity, compute, network, ' - 'image, object-store, or other service identifier string).') -@utils.arg('--name', metavar='<name>', - help='Name of new service (must be unique).') -@utils.arg('--description', metavar='<service-description>', - help='Description of service.') -def do_service_create(kc, args): - """Add service to Service Catalog.""" - service = kc.services.create(args.name, - args.type, - args.description) - utils.print_dict(service._info) - - -def do_service_list(kc, args): - """List all services in Service Catalog.""" - services = kc.services.list() - utils.print_list(services, ['id', 'name', 'type', 'description'], - order_by='name') - - -@utils.arg('service', metavar='<service>', - help='Name or ID of service to display.') -def do_service_get(kc, args): - """Display service from Service Catalog.""" - service = utils.find_resource(kc.services, args.service) - utils.print_dict(service._info) - - -@utils.arg('service', metavar='<service>', - help='Name or ID of service to delete.') -def do_service_delete(kc, args): - """Delete service from Service Catalog.""" - service = utils.find_resource(kc.services, args.service) - kc.services.delete(service.id) - - -def do_role_list(kc, args): - """List all roles.""" - roles = kc.roles.list() - utils.print_list(roles, ['id', 'name'], order_by='name') - - -@utils.arg('role', metavar='<role>', help='Name or ID of role to display.') -def do_role_get(kc, args): - """Display role details.""" - role = utils.find_resource(kc.roles, args.role) - utils.print_dict(role._info) - - -@utils.arg('--name', metavar='<role-name>', required=True, - help='Name of new role.') -def do_role_create(kc, args): - """Create new role.""" - role = kc.roles.create(args.name) - utils.print_dict(role._info) - - -@utils.arg('role', metavar='<role>', help='Name or ID of role to delete.') -def do_role_delete(kc, args): - """Delete role.""" - role = utils.find_resource(kc.roles, args.role) - kc.roles.delete(role) - - -@utils.arg('--user', '--user-id', '--user_id', metavar='<user>', - required=True, help='Name or ID of user.') -@utils.arg('--role', '--role-id', '--role_id', metavar='<role>', - required=True, help='Name or ID of role.') -@utils.arg('--tenant', '--tenant-id', metavar='<tenant>', - help='Name or ID of tenant.') -@utils.arg('--tenant_id', help=argparse.SUPPRESS) -def do_user_role_add(kc, args): - """Add role to user.""" - user = utils.find_resource(kc.users, args.user) - role = utils.find_resource(kc.roles, args.role) - if args.tenant: - tenant = utils.find_resource(kc.tenants, args.tenant) - elif args.tenant_id: - tenant = args.tenant_id - else: - tenant = None - kc.roles.add_user_role(user, role, tenant) - - -@utils.arg('--user', '--user-id', '--user_id', metavar='<user>', - required=True, help='Name or ID of user.') -@utils.arg('--role', '--role-id', '--role_id', metavar='<role>', - required=True, help='Name or ID of role.') -@utils.arg('--tenant', '--tenant-id', metavar='<tenant>', - help='Name or ID of tenant.') -@utils.arg('--tenant_id', help=argparse.SUPPRESS) -def do_user_role_remove(kc, args): - """Remove role from user.""" - user = utils.find_resource(kc.users, args.user) - role = utils.find_resource(kc.roles, args.role) - if args.tenant: - tenant = utils.find_resource(kc.tenants, args.tenant) - elif args.tenant_id: - tenant = args.tenant_id - else: - tenant = None - kc.roles.remove_user_role(user, role, tenant) - - -@utils.arg('--user', '--user-id', metavar='<user>', - help='List roles granted to specified user.') -@utils.arg('--user_id', help=argparse.SUPPRESS) -@utils.arg('--tenant', '--tenant-id', metavar='<tenant>', - help='List only roles granted on specified tenant.') -@utils.arg('--tenant_id', help=argparse.SUPPRESS) -def do_user_role_list(kc, args): - """List roles granted to a user.""" - if args.tenant: - tenant_id = utils.find_resource(kc.tenants, args.tenant).id - elif args.tenant_id: - tenant_id = args.tenant_id - else: - # use the authenticated tenant id as a default - tenant_id = kc.auth_tenant_id - - if args.user: - user_id = utils.find_resource(kc.users, args.user).id - elif args.user_id: - user_id = args.user_id - else: - # use the authenticated user id as a default - user_id = kc.auth_user_id - roles = kc.roles.roles_for_user(user=user_id, tenant=tenant_id) - - # this makes the command output a bit more intuitive - for role in roles: - role.user_id = user_id - role.tenant_id = tenant_id - - utils.print_list(roles, ['id', 'name', 'user_id', 'tenant_id'], - order_by='name') - - -@utils.arg('--user-id', metavar='<user-id>', - help='User ID for which to create credentials. If not specified, ' - 'the authenticated user will be used.') -@utils.arg('--user_id', help=argparse.SUPPRESS) -@utils.arg('--tenant-id', metavar='<tenant-id>', - help='Tenant ID for which to create credentials. If not ' - 'specified, the authenticated tenant ID will be used.') -@utils.arg('--tenant_id', help=argparse.SUPPRESS) -def do_ec2_credentials_create(kc, args): - """Create EC2-compatible credentials for user per tenant.""" - if not args.tenant_id: - # use the authenticated tenant id as a default - args.tenant_id = kc.auth_tenant_id - if not args.user_id: - # use the authenticated user id as a default - args.user_id = kc.auth_user_id - credentials = kc.ec2.create(args.user_id, args.tenant_id) - utils.print_dict(credentials._info) - - -@utils.arg('--user-id', metavar='<user-id>', help='User ID.') -@utils.arg('--user_id', help=argparse.SUPPRESS) -@utils.arg('--access', metavar='<access-key>', required=True, - help='Access Key.') -def do_ec2_credentials_get(kc, args): - """Display EC2-compatible credentials.""" - if not args.user_id: - # use the authenticated user id as a default - args.user_id = kc.auth_user_id - cred = kc.ec2.get(args.user_id, args.access) - if cred: - utils.print_dict(cred._info) - - -@utils.arg('--user-id', metavar='<user-id>', help='User ID.') -@utils.arg('--user_id', help=argparse.SUPPRESS) -def do_ec2_credentials_list(kc, args): - """List EC2-compatible credentials for a user.""" - if not args.user_id: - # use the authenticated user id as a default - args.user_id = kc.auth_user_id - credentials = kc.ec2.list(args.user_id) - for cred in credentials: - try: - cred.tenant = getattr(kc.tenants.get(cred.tenant_id), 'name') - except Exception: - # FIXME(dtroyer): Retrieving the tenant name fails for normal - # users; stuff in the tenant_id instead. - cred.tenant = cred.tenant_id - utils.print_list(credentials, ['tenant', 'access', 'secret']) - - -@utils.arg('--user-id', metavar='<user-id>', help='User ID.') -@utils.arg('--user_id', help=argparse.SUPPRESS) -@utils.arg('--access', metavar='<access-key>', required=True, - help='Access Key.') -def do_ec2_credentials_delete(kc, args): - """Delete EC2-compatible credentials.""" - if not args.user_id: - # use the authenticated user id as a default - args.user_id = kc.auth_user_id - try: - kc.ec2.delete(args.user_id, args.access) - print(_('Credential has been deleted.')) - except Exception as e: - print(_('Unable to delete credential: %s') % e) - - -@utils.arg('--service', metavar='<service-type>', default=None, - help='Service type to return.') -@require_service_catalog -def do_catalog(kc, args): - """List service catalog, possibly filtered by service.""" - endpoints = kc.service_catalog.get_endpoints(service_type=args.service) - for (service, service_endpoints) in six.iteritems(endpoints): - if len(service_endpoints) > 0: - print(_("Service: %s") % service) - for ep in service_endpoints: - utils.print_dict(ep) - - -@utils.arg('--service', metavar='<service-type>', required=True, - help='Service type to select.') -@utils.arg('--endpoint-type', metavar='<endpoint-type>', default='publicURL', - help='Endpoint type to select.') -@utils.arg('--endpoint_type', default='publicURL', - help=argparse.SUPPRESS) -@utils.arg('--attr', metavar='<service-attribute>', - help='Service attribute to match for selection.') -@utils.arg('--value', metavar='<value>', - help='Value of attribute to match.') -@require_service_catalog -def do_endpoint_get(kc, args): - """Find endpoint filtered by a specific attribute or service type.""" - kwargs = { - 'service_type': args.service, - 'endpoint_type': args.endpoint_type, - } - - if args.attr and args.value: - kwargs.update({'attr': args.attr, 'filter_value': args.value}) - elif args.attr or args.value: - print(_('Both --attr and --value required.')) - return - - url = kc.service_catalog.url_for(**kwargs) - utils.print_dict({'%s.%s' % (args.service, args.endpoint_type): url}) - - -def do_endpoint_list(kc, args): - """List configured service endpoints.""" - endpoints = kc.endpoints.list() - utils.print_list(endpoints, - ['id', 'region', 'publicurl', - 'internalurl', 'adminurl', 'service_id']) - - -@utils.arg('--region', metavar='<endpoint-region>', - help='Endpoint region.', default='regionOne') -@utils.arg('--service', '--service-id', '--service_id', - metavar='<service>', required=True, - help='Name or ID of service associated with endpoint.') -@utils.arg('--publicurl', metavar='<public-url>', required=True, - help='Public URL endpoint.') -@utils.arg('--adminurl', metavar='<admin-url>', - help='Admin URL endpoint.') -@utils.arg('--internalurl', metavar='<internal-url>', - help='Internal URL endpoint.') -def do_endpoint_create(kc, args): - """Create a new endpoint associated with a service.""" - service_id = utils.find_resource(kc.services, args.service).id - endpoint = kc.endpoints.create(args.region, - service_id, - args.publicurl, - args.adminurl, - args.internalurl) - utils.print_dict(endpoint._info) - - -@utils.arg('id', metavar='<endpoint-id>', help='ID of endpoint to delete.') -def do_endpoint_delete(kc, args): - """Delete a service endpoint.""" - try: - kc.endpoints.delete(args.id) - print(_('Endpoint has been deleted.')) - except Exception: - print(_('Unable to delete endpoint.')) - - -@utils.arg('--wrap', metavar='<integer>', default=0, - help='Wrap PKI tokens to a specified length, or 0 to disable.') -@require_service_catalog -def do_token_get(kc, args): - """Display the current user token.""" - utils.print_dict(kc.service_catalog.get_token(), - wrap=int(args.wrap)) diff --git a/releasenotes/notes/remove_cli-d2c4435ba6a09b79.yaml b/releasenotes/notes/remove_cli-d2c4435ba6a09b79.yaml new file mode 100644 index 0000000..c142809 --- /dev/null +++ b/releasenotes/notes/remove_cli-d2c4435ba6a09b79.yaml @@ -0,0 +1,7 @@ +--- +prelude: > + The ``keystone`` CLI has been removed. +other: + - The ``keystone`` CLI has been removed, using the ``openstack`` + CLI is recommended. This feature has been deprecated since the + Liberty release of Keystone. diff --git a/requirements.txt b/requirements.txt index d9bc0f6..7206e2f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,6 @@ oslo.i18n>=2.1.0 # Apache-2.0 oslo.serialization>=1.10.0 # Apache-2.0 oslo.utils>=3.5.0 # Apache-2.0 positional>=1.0.1 # Apache-2.0 -PrettyTable<0.8,>=0.7 # BSD requests!=2.9.0,>=2.8.1 # Apache-2.0 six>=1.9.0 # MIT stevedore>=1.5.0 # Apache-2.0 @@ -23,9 +23,6 @@ packages = keystoneclient [entry_points] -console_scripts = - keystone = keystoneclient.shell:main - keystoneclient.auth.plugin = password = keystoneclient.auth.identity.generic:Password token = keystoneclient.auth.identity.generic:Token |