summaryrefslogtreecommitdiff
path: root/test/support
diff options
context:
space:
mode:
authorMatt Martz <matt@sivel.net>2020-09-22 10:33:59 -0500
committerGitHub <noreply@github.com>2020-09-22 11:33:59 -0400
commita99212464c1e5de0245a8429047fa2e05458f20b (patch)
tree380e86934c939b0605ed9b7bd5d43e5fb8ed6fc3 /test/support
parent00b22ab55ea1b514053ca8a9ba3928bc210ea3aa (diff)
downloadansible-a99212464c1e5de0245a8429047fa2e05458f20b.tar.gz
Remove incidental_consul tests (#71811)
* Add explicit intg tests for argspec functionality * ci_complete ci_coverage * Remove incidental_consul and incidental_setup_openssl * ci_complete ci_coverage
Diffstat (limited to 'test/support')
-rw-r--r--test/support/integration/plugins/modules/consul_session.py284
-rw-r--r--test/support/integration/plugins/modules/openssl_certificate.py2757
-rw-r--r--test/support/integration/plugins/modules/openssl_certificate_info.py864
-rw-r--r--test/support/integration/plugins/modules/openssl_csr.py1161
-rw-r--r--test/support/integration/plugins/modules/openssl_privatekey.py944
5 files changed, 0 insertions, 6010 deletions
diff --git a/test/support/integration/plugins/modules/consul_session.py b/test/support/integration/plugins/modules/consul_session.py
deleted file mode 100644
index 6802ebe64e..0000000000
--- a/test/support/integration/plugins/modules/consul_session.py
+++ /dev/null
@@ -1,284 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-
-# Copyright: (c) 2015, Steve Gargan <steve.gargan@gmail.com>
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import absolute_import, division, print_function
-__metaclass__ = type
-
-ANSIBLE_METADATA = {'metadata_version': '1.1',
- 'status': ['preview'],
- 'supported_by': 'community'}
-
-DOCUMENTATION = """
-module: consul_session
-short_description: Manipulate consul sessions
-description:
- - Allows the addition, modification and deletion of sessions in a consul
- cluster. These sessions can then be used in conjunction with key value pairs
- to implement distributed locks. In depth documentation for working with
- sessions can be found at http://www.consul.io/docs/internals/sessions.html
-requirements:
- - python-consul
- - requests
-version_added: "2.0"
-author:
-- Steve Gargan (@sgargan)
-options:
- id:
- description:
- - ID of the session, required when I(state) is either C(info) or
- C(remove).
- type: str
- state:
- description:
- - Whether the session should be present i.e. created if it doesn't
- exist, or absent, removed if present. If created, the I(id) for the
- session is returned in the output. If C(absent), I(id) is
- required to remove the session. Info for a single session, all the
- sessions for a node or all available sessions can be retrieved by
- specifying C(info), C(node) or C(list) for the I(state); for C(node)
- or C(info), the node I(name) or session I(id) is required as parameter.
- choices: [ absent, info, list, node, present ]
- type: str
- default: present
- name:
- description:
- - The name that should be associated with the session. Required when
- I(state=node) is used.
- type: str
- delay:
- description:
- - The optional lock delay that can be attached to the session when it
- is created. Locks for invalidated sessions ar blocked from being
- acquired until this delay has expired. Durations are in seconds.
- type: int
- default: 15
- node:
- description:
- - The name of the node that with which the session will be associated.
- by default this is the name of the agent.
- type: str
- datacenter:
- description:
- - The name of the datacenter in which the session exists or should be
- created.
- type: str
- checks:
- description:
- - Checks that will be used to verify the session health. If
- all the checks fail, the session will be invalidated and any locks
- associated with the session will be release and can be acquired once
- the associated lock delay has expired.
- type: list
- host:
- description:
- - The host of the consul agent defaults to localhost.
- type: str
- default: localhost
- port:
- description:
- - The port on which the consul agent is running.
- type: int
- default: 8500
- scheme:
- description:
- - The protocol scheme on which the consul agent is running.
- type: str
- default: http
- version_added: "2.1"
- validate_certs:
- description:
- - Whether to verify the TLS certificate of the consul agent.
- type: bool
- default: True
- version_added: "2.1"
- behavior:
- description:
- - The optional behavior that can be attached to the session when it
- is created. This controls the behavior when a session is invalidated.
- choices: [ delete, release ]
- type: str
- default: release
- version_added: "2.2"
-"""
-
-EXAMPLES = '''
-- name: register basic session with consul
- consul_session:
- name: session1
-
-- name: register a session with an existing check
- consul_session:
- name: session_with_check
- checks:
- - existing_check_name
-
-- name: register a session with lock_delay
- consul_session:
- name: session_with_delay
- delay: 20s
-
-- name: retrieve info about session by id
- consul_session:
- id: session_id
- state: info
-
-- name: retrieve active sessions
- consul_session:
- state: list
-'''
-
-try:
- import consul
- from requests.exceptions import ConnectionError
- python_consul_installed = True
-except ImportError:
- python_consul_installed = False
-
-from ansible.module_utils.basic import AnsibleModule
-
-
-def execute(module):
-
- state = module.params.get('state')
-
- if state in ['info', 'list', 'node']:
- lookup_sessions(module)
- elif state == 'present':
- update_session(module)
- else:
- remove_session(module)
-
-
-def lookup_sessions(module):
-
- datacenter = module.params.get('datacenter')
-
- state = module.params.get('state')
- consul_client = get_consul_api(module)
- try:
- if state == 'list':
- sessions_list = consul_client.session.list(dc=datacenter)
- # Ditch the index, this can be grabbed from the results
- if sessions_list and len(sessions_list) >= 2:
- sessions_list = sessions_list[1]
- module.exit_json(changed=True,
- sessions=sessions_list)
- elif state == 'node':
- node = module.params.get('node')
- sessions = consul_client.session.node(node, dc=datacenter)
- module.exit_json(changed=True,
- node=node,
- sessions=sessions)
- elif state == 'info':
- session_id = module.params.get('id')
-
- session_by_id = consul_client.session.info(session_id, dc=datacenter)
- module.exit_json(changed=True,
- session_id=session_id,
- sessions=session_by_id)
-
- except Exception as e:
- module.fail_json(msg="Could not retrieve session info %s" % e)
-
-
-def update_session(module):
-
- name = module.params.get('name')
- delay = module.params.get('delay')
- checks = module.params.get('checks')
- datacenter = module.params.get('datacenter')
- node = module.params.get('node')
- behavior = module.params.get('behavior')
-
- consul_client = get_consul_api(module)
-
- try:
- session = consul_client.session.create(
- name=name,
- behavior=behavior,
- node=node,
- lock_delay=delay,
- dc=datacenter,
- checks=checks
- )
- module.exit_json(changed=True,
- session_id=session,
- name=name,
- behavior=behavior,
- delay=delay,
- checks=checks,
- node=node)
- except Exception as e:
- module.fail_json(msg="Could not create/update session %s" % e)
-
-
-def remove_session(module):
- session_id = module.params.get('id')
-
- consul_client = get_consul_api(module)
-
- try:
- consul_client.session.destroy(session_id)
-
- module.exit_json(changed=True,
- session_id=session_id)
- except Exception as e:
- module.fail_json(msg="Could not remove session with id '%s' %s" % (
- session_id, e))
-
-
-def get_consul_api(module):
- return consul.Consul(host=module.params.get('host'),
- port=module.params.get('port'),
- scheme=module.params.get('scheme'),
- verify=module.params.get('validate_certs'))
-
-
-def test_dependencies(module):
- if not python_consul_installed:
- module.fail_json(msg="python-consul required for this module. "
- "see https://python-consul.readthedocs.io/en/latest/#installation")
-
-
-def main():
- argument_spec = dict(
- checks=dict(type='list'),
- delay=dict(type='int', default='15'),
- behavior=dict(type='str', default='release', choices=['release', 'delete']),
- host=dict(type='str', default='localhost'),
- port=dict(type='int', default=8500),
- scheme=dict(type='str', default='http'),
- validate_certs=dict(type='bool', default=True),
- id=dict(type='str'),
- name=dict(type='str'),
- node=dict(type='str'),
- state=dict(type='str', default='present', choices=['absent', 'info', 'list', 'node', 'present']),
- datacenter=dict(type='str'),
- )
-
- module = AnsibleModule(
- argument_spec=argument_spec,
- required_if=[
- ('state', 'node', ['name']),
- ('state', 'info', ['id']),
- ('state', 'remove', ['id']),
- ],
- supports_check_mode=False
- )
-
- test_dependencies(module)
-
- try:
- execute(module)
- except ConnectionError as e:
- module.fail_json(msg='Could not connect to consul agent at %s:%s, error was %s' % (
- module.params.get('host'), module.params.get('port'), e))
- except Exception as e:
- module.fail_json(msg=str(e))
-
-
-if __name__ == '__main__':
- main()
diff --git a/test/support/integration/plugins/modules/openssl_certificate.py b/test/support/integration/plugins/modules/openssl_certificate.py
deleted file mode 100644
index 28780bf22c..0000000000
--- a/test/support/integration/plugins/modules/openssl_certificate.py
+++ /dev/null
@@ -1,2757 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-
-# Copyright: (c) 2016-2017, Yanis Guenane <yanis+ansible@guenane.org>
-# Copyright: (c) 2017, Markus Teufelberger <mteufelberger+ansible@mgit.at>
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import absolute_import, division, print_function
-__metaclass__ = type
-
-ANSIBLE_METADATA = {'metadata_version': '1.1',
- 'status': ['preview'],
- 'supported_by': 'community'}
-
-DOCUMENTATION = r'''
----
-module: openssl_certificate
-version_added: "2.4"
-short_description: Generate and/or check OpenSSL certificates
-description:
- - This module allows one to (re)generate OpenSSL certificates.
- - It implements a notion of provider (ie. C(selfsigned), C(ownca), C(acme), C(assertonly), C(entrust))
- for your certificate.
- - The C(assertonly) provider is intended for use cases where one is only interested in
- checking properties of a supplied certificate. Please note that this provider has been
- deprecated in Ansible 2.9 and will be removed in Ansible 2.13. See the examples on how
- to emulate C(assertonly) usage with M(openssl_certificate_info), M(openssl_csr_info),
- M(openssl_privatekey_info) and M(assert). This also allows more flexible checks than
- the ones offered by the C(assertonly) provider.
- - The C(ownca) provider is intended for generating OpenSSL certificate signed with your own
- CA (Certificate Authority) certificate (self-signed certificate).
- - Many properties that can be specified in this module are for validation of an
- existing or newly generated certificate. The proper place to specify them, if you
- want to receive a certificate with these properties is a CSR (Certificate Signing Request).
- - "Please note that the module regenerates existing certificate if it doesn't match the module's
- options, or if it seems to be corrupt. If you are concerned that this could overwrite
- your existing certificate, consider using the I(backup) option."
- - It uses the pyOpenSSL or cryptography python library to interact with OpenSSL.
- - If both the cryptography and PyOpenSSL libraries are available (and meet the minimum version requirements)
- cryptography will be preferred as a backend over PyOpenSSL (unless the backend is forced with C(select_crypto_backend)).
- Please note that the PyOpenSSL backend was deprecated in Ansible 2.9 and will be removed in Ansible 2.13.
-requirements:
- - PyOpenSSL >= 0.15 or cryptography >= 1.6 (if using C(selfsigned) or C(assertonly) provider)
- - acme-tiny >= 4.0.0 (if using the C(acme) provider)
-author:
- - Yanis Guenane (@Spredzy)
- - Markus Teufelberger (@MarkusTeufelberger)
-options:
- state:
- description:
- - Whether the certificate should exist or not, taking action if the state is different from what is stated.
- type: str
- default: present
- choices: [ absent, present ]
-
- path:
- description:
- - Remote absolute path where the generated certificate file should be created or is already located.
- type: path
- required: true
-
- provider:
- description:
- - Name of the provider to use to generate/retrieve the OpenSSL certificate.
- - The C(assertonly) provider will not generate files and fail if the certificate file is missing.
- - The C(assertonly) provider has been deprecated in Ansible 2.9 and will be removed in Ansible 2.13.
- Please see the examples on how to emulate it with M(openssl_certificate_info), M(openssl_csr_info),
- M(openssl_privatekey_info) and M(assert).
- - "The C(entrust) provider was added for Ansible 2.9 and requires credentials for the
- L(Entrust Certificate Services,https://www.entrustdatacard.com/products/categories/ssl-certificates) (ECS) API."
- - Required if I(state) is C(present).
- type: str
- choices: [ acme, assertonly, entrust, ownca, selfsigned ]
-
- force:
- description:
- - Generate the certificate, even if it already exists.
- type: bool
- default: no
-
- csr_path:
- description:
- - Path to the Certificate Signing Request (CSR) used to generate this certificate.
- - This is not required in C(assertonly) mode.
- - This is mutually exclusive with I(csr_content).
- type: path
- csr_content:
- description:
- - Content of the Certificate Signing Request (CSR) used to generate this certificate.
- - This is not required in C(assertonly) mode.
- - This is mutually exclusive with I(csr_path).
- type: str
- version_added: "2.10"
-
- privatekey_path:
- description:
- - Path to the private key to use when signing the certificate.
- - This is mutually exclusive with I(privatekey_content).
- type: path
- privatekey_content:
- description:
- - Path to the private key to use when signing the certificate.
- - This is mutually exclusive with I(privatekey_path).
- type: str
- version_added: "2.10"
-
- privatekey_passphrase:
- description:
- - The passphrase for the I(privatekey_path) resp. I(privatekey_content).
- - This is required if the private key is password protected.
- type: str
-
- selfsigned_version:
- description:
- - Version of the C(selfsigned) certificate.
- - Nowadays it should almost always be C(3).
- - This is only used by the C(selfsigned) provider.
- type: int
- default: 3
- version_added: "2.5"
-
- selfsigned_digest:
- description:
- - Digest algorithm to be used when self-signing the certificate.
- - This is only used by the C(selfsigned) provider.
- type: str
- default: sha256
-
- selfsigned_not_before:
- description:
- - The point in time the certificate is valid from.
- - Time can be specified either as relative time or as absolute timestamp.
- - Time will always be interpreted as UTC.
- - Valid format is C([+-]timespec | ASN.1 TIME) where timespec can be an integer
- + C([w | d | h | m | s]) (e.g. C(+32w1d2h).
- - Note that if using relative time this module is NOT idempotent.
- - If this value is not specified, the certificate will start being valid from now.
- - This is only used by the C(selfsigned) provider.
- type: str
- default: +0s
- aliases: [ selfsigned_notBefore ]
-
- selfsigned_not_after:
- description:
- - The point in time at which the certificate stops being valid.
- - Time can be specified either as relative time or as absolute timestamp.
- - Time will always be interpreted as UTC.
- - Valid format is C([+-]timespec | ASN.1 TIME) where timespec can be an integer
- + C([w | d | h | m | s]) (e.g. C(+32w1d2h).
- - Note that if using relative time this module is NOT idempotent.
- - If this value is not specified, the certificate will stop being valid 10 years from now.
- - This is only used by the C(selfsigned) provider.
- type: str
- default: +3650d
- aliases: [ selfsigned_notAfter ]
-
- selfsigned_create_subject_key_identifier:
- description:
- - Whether to create the Subject Key Identifier (SKI) from the public key.
- - A value of C(create_if_not_provided) (default) only creates a SKI when the CSR does not
- provide one.
- - A value of C(always_create) always creates a SKI. If the CSR provides one, that one is
- ignored.
- - A value of C(never_create) never creates a SKI. If the CSR provides one, that one is used.
- - This is only used by the C(selfsigned) provider.
- - Note that this is only supported if the C(cryptography) backend is used!
- type: str
- choices: [create_if_not_provided, always_create, never_create]
- default: create_if_not_provided
- version_added: "2.9"
-
- ownca_path:
- description:
- - Remote absolute path of the CA (Certificate Authority) certificate.
- - This is only used by the C(ownca) provider.
- - This is mutually exclusive with I(ownca_content).
- type: path
- version_added: "2.7"
- ownca_content:
- description:
- - Content of the CA (Certificate Authority) certificate.
- - This is only used by the C(ownca) provider.
- - This is mutually exclusive with I(ownca_path).
- type: str
- version_added: "2.10"
-
- ownca_privatekey_path:
- description:
- - Path to the CA (Certificate Authority) private key to use when signing the certificate.
- - This is only used by the C(ownca) provider.
- - This is mutually exclusive with I(ownca_privatekey_content).
- type: path
- version_added: "2.7"
- ownca_privatekey_content:
- description:
- - Path to the CA (Certificate Authority) private key to use when signing the certificate.
- - This is only used by the C(ownca) provider.
- - This is mutually exclusive with I(ownca_privatekey_path).
- type: str
- version_added: "2.10"
-
- ownca_privatekey_passphrase:
- description:
- - The passphrase for the I(ownca_privatekey_path) resp. I(ownca_privatekey_content).
- - This is only used by the C(ownca) provider.
- type: str
- version_added: "2.7"
-
- ownca_digest:
- description:
- - The digest algorithm to be used for the C(ownca) certificate.
- - This is only used by the C(ownca) provider.
- type: str
- default: sha256
- version_added: "2.7"
-
- ownca_version:
- description:
- - The version of the C(ownca) certificate.
- - Nowadays it should almost always be C(3).
- - This is only used by the C(ownca) provider.
- type: int
- default: 3
- version_added: "2.7"
-
- ownca_not_before:
- description:
- - The point in time the certificate is valid from.
- - Time can be specified either as relative time or as absolute timestamp.
- - Time will always be interpreted as UTC.
- - Valid format is C([+-]timespec | ASN.1 TIME) where timespec can be an integer
- + C([w | d | h | m | s]) (e.g. C(+32w1d2h).
- - Note that if using relative time this module is NOT idempotent.
- - If this value is not specified, the certificate will start being valid from now.
- - This is only used by the C(ownca) provider.
- type: str
- default: +0s
- version_added: "2.7"
-
- ownca_not_after:
- description:
- - The point in time at which the certificate stops being valid.
- - Time can be specified either as relative time or as absolute timestamp.
- - Time will always be interpreted as UTC.
- - Valid format is C([+-]timespec | ASN.1 TIME) where timespec can be an integer
- + C([w | d | h | m | s]) (e.g. C(+32w1d2h).
- - Note that if using relative time this module is NOT idempotent.
- - If this value is not specified, the certificate will stop being valid 10 years from now.
- - This is only used by the C(ownca) provider.
- type: str
- default: +3650d
- version_added: "2.7"
-
- ownca_create_subject_key_identifier:
- description:
- - Whether to create the Subject Key Identifier (SKI) from the public key.
- - A value of C(create_if_not_provided) (default) only creates a SKI when the CSR does not
- provide one.
- - A value of C(always_create) always creates a SKI. If the CSR provides one, that one is
- ignored.
- - A value of C(never_create) never creates a SKI. If the CSR provides one, that one is used.
- - This is only used by the C(ownca) provider.
- - Note that this is only supported if the C(cryptography) backend is used!
- type: str
- choices: [create_if_not_provided, always_create, never_create]
- default: create_if_not_provided
- version_added: "2.9"
-
- ownca_create_authority_key_identifier:
- description:
- - Create a Authority Key Identifier from the CA's certificate. If the CSR provided
- a authority key identifier, it is ignored.
- - The Authority Key Identifier is generated from the CA certificate's Subject Key Identifier,
- if available. If it is not available, the CA certificate's public key will be used.
- - This is only used by the C(ownca) provider.
- - Note that this is only supported if the C(cryptography) backend is used!
- type: bool
- default: yes
- version_added: "2.9"
-
- acme_accountkey_path:
- description:
- - The path to the accountkey for the C(acme) provider.
- - This is only used by the C(acme) provider.
- type: path
-
- acme_challenge_path:
- description:
- - The path to the ACME challenge directory that is served on U(http://<HOST>:80/.well-known/acme-challenge/)
- - This is only used by the C(acme) provider.
- type: path
-
- acme_chain:
- description:
- - Include the intermediate certificate to the generated certificate
- - This is only used by the C(acme) provider.
- - Note that this is only available for older versions of C(acme-tiny).
- New versions include the chain automatically, and setting I(acme_chain) to C(yes) results in an error.
- type: bool
- default: no
- version_added: "2.5"
-
- acme_directory:
- description:
- - "The ACME directory to use. You can use any directory that supports the ACME protocol, such as Buypass or Let's Encrypt."
- - "Let's Encrypt recommends using their staging server while developing jobs. U(https://letsencrypt.org/docs/staging-environment/)."
- type: str
- default: https://acme-v02.api.letsencrypt.org/directory
- version_added: "2.10"
-
- signature_algorithms:
- description:
- - A list of algorithms that you would accept the certificate to be signed with
- (e.g. ['sha256WithRSAEncryption', 'sha512WithRSAEncryption']).
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: list
- elements: str
-
- issuer:
- description:
- - The key/value pairs that must be present in the issuer name field of the certificate.
- - If you need to specify more than one value with the same key, use a list as value.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: dict
-
- issuer_strict:
- description:
- - If set to C(yes), the I(issuer) field must contain only these values.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: bool
- default: no
- version_added: "2.5"
-
- subject:
- description:
- - The key/value pairs that must be present in the subject name field of the certificate.
- - If you need to specify more than one value with the same key, use a list as value.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: dict
-
- subject_strict:
- description:
- - If set to C(yes), the I(subject) field must contain only these values.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: bool
- default: no
- version_added: "2.5"
-
- has_expired:
- description:
- - Checks if the certificate is expired/not expired at the time the module is executed.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: bool
- default: no
-
- version:
- description:
- - The version of the certificate.
- - Nowadays it should almost always be 3.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: int
-
- valid_at:
- description:
- - The certificate must be valid at this point in time.
- - The timestamp is formatted as an ASN.1 TIME.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: str
-
- invalid_at:
- description:
- - The certificate must be invalid at this point in time.
- - The timestamp is formatted as an ASN.1 TIME.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: str
-
- not_before:
- description:
- - The certificate must start to become valid at this point in time.
- - The timestamp is formatted as an ASN.1 TIME.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: str
- aliases: [ notBefore ]
-
- not_after:
- description:
- - The certificate must expire at this point in time.
- - The timestamp is formatted as an ASN.1 TIME.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: str
- aliases: [ notAfter ]
-
- valid_in:
- description:
- - The certificate must still be valid at this relative time offset from now.
- - Valid format is C([+-]timespec | number_of_seconds) where timespec can be an integer
- + C([w | d | h | m | s]) (e.g. C(+32w1d2h).
- - Note that if using this parameter, this module is NOT idempotent.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: str
-
- key_usage:
- description:
- - The I(key_usage) extension field must contain all these values.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: list
- elements: str
- aliases: [ keyUsage ]
-
- key_usage_strict:
- description:
- - If set to C(yes), the I(key_usage) extension field must contain only these values.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: bool
- default: no
- aliases: [ keyUsage_strict ]
-
- extended_key_usage:
- description:
- - The I(extended_key_usage) extension field must contain all these values.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: list
- elements: str
- aliases: [ extendedKeyUsage ]
-
- extended_key_usage_strict:
- description:
- - If set to C(yes), the I(extended_key_usage) extension field must contain only these values.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: bool
- default: no
- aliases: [ extendedKeyUsage_strict ]
-
- subject_alt_name:
- description:
- - The I(subject_alt_name) extension field must contain these values.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: list
- elements: str
- aliases: [ subjectAltName ]
-
- subject_alt_name_strict:
- description:
- - If set to C(yes), the I(subject_alt_name) extension field must contain only these values.
- - This is only used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: bool
- default: no
- aliases: [ subjectAltName_strict ]
-
- select_crypto_backend:
- description:
- - Determines which crypto backend to use.
- - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl).
- - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library.
- - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library.
- - Please note that the C(pyopenssl) backend has been deprecated in Ansible 2.9, and will be removed in Ansible 2.13.
- From that point on, only the C(cryptography) backend will be available.
- type: str
- default: auto
- choices: [ auto, cryptography, pyopenssl ]
- version_added: "2.8"
-
- backup:
- description:
- - Create a backup file including a timestamp so you can get the original
- certificate back if you overwrote it with a new one by accident.
- - This is not used by the C(assertonly) provider.
- - This option is deprecated since Ansible 2.9 and will be removed with the C(assertonly) provider in Ansible 2.13.
- For alternatives, see the example on replacing C(assertonly).
- type: bool
- default: no
- version_added: "2.8"
-
- entrust_cert_type:
- description:
- - Specify the type of certificate requested.
- - This is only used by the C(entrust) provider.
- type: str
- default: STANDARD_SSL
- choices: [ 'STANDARD_SSL', 'ADVANTAGE_SSL', 'UC_SSL', 'EV_SSL', 'WILDCARD_SSL', 'PRIVATE_SSL', 'PD_SSL', 'CDS_ENT_LITE', 'CDS_ENT_PRO', 'SMIME_ENT' ]
- version_added: "2.9"
-
- entrust_requester_email:
- description:
- - The email of the requester of the certificate (for tracking purposes).
- - This is only used by the C(entrust) provider.
- - This is required if the provider is C(entrust).
- type: str
- version_added: "2.9"
-
- entrust_requester_name:
- description:
- - The name of the requester of the certificate (for tracking purposes).
- - This is only used by the C(entrust) provider.
- - This is required if the provider is C(entrust).
- type: str
- version_added: "2.9"
-
- entrust_requester_phone:
- description:
- - The phone number of the requester of the certificate (for tracking purposes).
- - This is only used by the C(entrust) provider.
- - This is required if the provider is C(entrust).
- type: str
- version_added: "2.9"
-
- entrust_api_user:
- description:
- - The username for authentication to the Entrust Certificate Services (ECS) API.
- - This is only used by the C(entrust) provider.
- - This is required if the provider is C(entrust).
- type: str
- version_added: "2.9"
-
- entrust_api_key:
- description:
- - The key (password) for authentication to the Entrust Certificate Services (ECS) API.
- - This is only used by the C(entrust) provider.
- - This is required if the provider is C(entrust).
- type: str
- version_added: "2.9"
-
- entrust_api_client_cert_path:
- description:
- - The path to the client certificate used to authenticate to the Entrust Certificate Services (ECS) API.
- - This is only used by the C(entrust) provider.
- - This is required if the provider is C(entrust).
- type: path
- version_added: "2.9"
-
- entrust_api_client_cert_key_path:
- description:
- - The path to the private key of the client certificate used to authenticate to the Entrust Certificate Services (ECS) API.
- - This is only used by the C(entrust) provider.
- - This is required if the provider is C(entrust).
- type: path
- version_added: "2.9"
-
- entrust_not_after:
- description:
- - The point in time at which the certificate stops being valid.
- - Time can be specified either as relative time or as an absolute timestamp.
- - A valid absolute time format is C(ASN.1 TIME) such as C(2019-06-18).
- - A valid relative time format is C([+-]timespec) where timespec can be an integer + C([w | d | h | m | s]), such as C(+365d) or C(+32w1d2h)).
- - Time will always be interpreted as UTC.
- - Note that only the date (day, month, year) is supported for specifying the expiry date of the issued certificate.
- - The full date-time is adjusted to EST (GMT -5:00) before issuance, which may result in a certificate with an expiration date one day
- earlier than expected if a relative time is used.
- - The minimum certificate lifetime is 90 days, and maximum is three years.
- - If this value is not specified, the certificate will stop being valid 365 days the date of issue.
- - This is only used by the C(entrust) provider.
- type: str
- default: +365d
- version_added: "2.9"
-
- entrust_api_specification_path:
- description:
- - The path to the specification file defining the Entrust Certificate Services (ECS) API configuration.
- - You can use this to keep a local copy of the specification to avoid downloading it every time the module is used.
- - This is only used by the C(entrust) provider.
- type: path
- default: https://cloud.entrust.net/EntrustCloud/documentation/cms-api-2.1.0.yaml
- version_added: "2.9"
-
- return_content:
- description:
- - If set to C(yes), will return the (current or generated) certificate's content as I(certificate).
- type: bool
- default: no
- version_added: "2.10"
-
-extends_documentation_fragment: files
-notes:
- - All ASN.1 TIME values should be specified following the YYYYMMDDHHMMSSZ pattern.
- - Date specified should be UTC. Minutes and seconds are mandatory.
- - For security reason, when you use C(ownca) provider, you should NOT run M(openssl_certificate) on
- a target machine, but on a dedicated CA machine. It is recommended not to store the CA private key
- on the target machine. Once signed, the certificate can be moved to the target machine.
-seealso:
-- module: openssl_csr
-- module: openssl_dhparam
-- module: openssl_pkcs12
-- module: openssl_privatekey
-- module: openssl_publickey
-'''
-
-EXAMPLES = r'''
-- name: Generate a Self Signed OpenSSL certificate
- openssl_certificate:
- path: /etc/ssl/crt/ansible.com.crt
- privatekey_path: /etc/ssl/private/ansible.com.pem
- csr_path: /etc/ssl/csr/ansible.com.csr
- provider: selfsigned
-
-- name: Generate an OpenSSL certificate signed with your own CA certificate
- openssl_certificate:
- path: /etc/ssl/crt/ansible.com.crt
- csr_path: /etc/ssl/csr/ansible.com.csr
- ownca_path: /etc/ssl/crt/ansible_CA.crt
- ownca_privatekey_path: /etc/ssl/private/ansible_CA.pem
- provider: ownca
-
-- name: Generate a Let's Encrypt Certificate
- openssl_certificate:
- path: /etc/ssl/crt/ansible.com.crt
- csr_path: /etc/ssl/csr/ansible.com.csr
- provider: acme
- acme_accountkey_path: /etc/ssl/private/ansible.com.pem
- acme_challenge_path: /etc/ssl/challenges/ansible.com/
-
-- name: Force (re-)generate a new Let's Encrypt Certificate
- openssl_certificate:
- path: /etc/ssl/crt/ansible.com.crt
- csr_path: /etc/ssl/csr/ansible.com.csr
- provider: acme
- acme_accountkey_path: /etc/ssl/private/ansible.com.pem
- acme_challenge_path: /etc/ssl/challenges/ansible.com/
- force: yes
-
-- name: Generate an Entrust certificate via the Entrust Certificate Services (ECS) API
- openssl_certificate:
- path: /etc/ssl/crt/ansible.com.crt
- csr_path: /etc/ssl/csr/ansible.com.csr
- provider: entrust
- entrust_requester_name: Jo Doe
- entrust_requester_email: jdoe@ansible.com
- entrust_requester_phone: 555-555-5555
- entrust_cert_type: STANDARD_SSL
- entrust_api_user: apiusername
- entrust_api_key: a^lv*32!cd9LnT
- entrust_api_client_cert_path: /etc/ssl/entrust/ecs-client.crt
- entrust_api_client_cert_key_path: /etc/ssl/entrust/ecs-key.crt
- entrust_api_specification_path: /etc/ssl/entrust/api-docs/cms-api-2.1.0.yaml
-
-# The following example shows one assertonly usage using all existing options for
-# assertonly, and shows how to emulate the behavior with the openssl_certificate_info,
-# openssl_csr_info, openssl_privatekey_info and assert modules:
-
-- openssl_certificate:
- provider: assertonly
- path: /etc/ssl/crt/ansible.com.crt
- csr_path: /etc/ssl/csr/ansible.com.csr
- privatekey_path: /etc/ssl/csr/ansible.com.key
- signature_algorithms:
- - sha256WithRSAEncryption
- - sha512WithRSAEncryption
- subject:
- commonName: ansible.com
- subject_strict: yes
- issuer:
- commonName: ansible.com
- issuer_strict: yes
- has_expired: no
- version: 3
- key_usage:
- - Data Encipherment
- key_usage_strict: yes
- extended_key_usage:
- - DVCS
- extended_key_usage_strict: yes
- subject_alt_name:
- - dns:ansible.com
- subject_alt_name_strict: yes
- not_before: 20190331202428Z
- not_after: 20190413202428Z
- valid_at: "+1d10h"
- invalid_at: 20200331202428Z
- valid_in: 10 # in ten seconds
-
-- openssl_certificate_info:
- path: /etc/ssl/crt/ansible.com.crt
- # for valid_at, invalid_at and valid_in
- valid_at:
- one_day_ten_hours: "+1d10h"
- fixed_timestamp: 20200331202428Z
- ten_seconds: "+10"
- register: result
-
-- openssl_csr_info:
- # Verifies that the CSR signature is valid; module will fail if not
- path: /etc/ssl/csr/ansible.com.csr
- register: result_csr
-
-- openssl_privatekey_info:
- path: /etc/ssl/csr/ansible.com.key
- register: result_privatekey
-
-- assert:
- that:
- # When private key is specified for assertonly, this will be checked:
- - result.public_key == result_privatekey.public_key
- # When CSR is specified for assertonly, this will be checked:
- - result.public_key == result_csr.public_key
- - result.subject_ordered == result_csr.subject_ordered
- - result.extensions_by_oid == result_csr.extensions_by_oid
- # signature_algorithms check
- - "result.signature_algorithm == 'sha256WithRSAEncryption' or result.signature_algorithm == 'sha512WithRSAEncryption'"
- # subject and subject_strict
- - "result.subject.commonName == 'ansible.com'"
- - "result.subject | length == 1" # the number must be the number of entries you check for
- # issuer and issuer_strict
- - "result.issuer.commonName == 'ansible.com'"
- - "result.issuer | length == 1" # the number must be the number of entries you check for
- # has_expired
- - not result.expired
- # version
- - result.version == 3
- # key_usage and key_usage_strict
- - "'Data Encipherment' in result.key_usage"
- - "result.key_usage | length == 1" # the number must be the number of entries you check for
- # extended_key_usage and extended_key_usage_strict
- - "'DVCS' in result.extended_key_usage"
- - "result.extended_key_usage | length == 1" # the number must be the number of entries you check for
- # subject_alt_name and subject_alt_name_strict
- - "'dns:ansible.com' in result.subject_alt_name"
- - "result.subject_alt_name | length == 1" # the number must be the number of entries you check for
- # not_before and not_after
- - "result.not_before == '20190331202428Z'"
- - "result.not_after == '20190413202428Z'"
- # valid_at, invalid_at and valid_in
- - "result.valid_at.one_day_ten_hours" # for valid_at
- - "not result.valid_at.fixed_timestamp" # for invalid_at
- - "result.valid_at.ten_seconds" # for valid_in
-
-# Examples for some checks one could use the assertonly provider for:
-# (Please note that assertonly has been deprecated!)
-
-# How to use the assertonly provider to implement and trigger your own custom certificate generation workflow:
-- name: Check if a certificate is currently still valid, ignoring failures
- openssl_certificate:
- path: /etc/ssl/crt/example.com.crt
- provider: assertonly
- has_expired: no
- ignore_errors: yes
- register: validity_check
-
-- name: Run custom task(s) to get a new, valid certificate in case the initial check failed
- command: superspecialSSL recreate /etc/ssl/crt/example.com.crt
- when: validity_check.failed
-
-- name: Check the new certificate again for validity with the same parameters, this time failing the play if it is still invalid
- openssl_certificate:
- path: /etc/ssl/crt/example.com.crt
- provider: assertonly
- has_expired: no
- when: validity_check.failed
-
-# Some other checks that assertonly could be used for:
-- name: Verify that an existing certificate was issued by the Let's Encrypt CA and is currently still valid
- openssl_certificate:
- path: /etc/ssl/crt/example.com.crt
- provider: assertonly
- issuer:
- O: Let's Encrypt
- has_expired: no
-
-- name: Ensure that a certificate uses a modern signature algorithm (no SHA1, MD5 or DSA)
- openssl_certificate:
- path: /etc/ssl/crt/example.com.crt
- provider: assertonly
- signature_algorithms:
- - sha224WithRSAEncryption
- - sha256WithRSAEncryption
- - sha384WithRSAEncryption
- - sha512WithRSAEncryption
- - sha224WithECDSAEncryption
- - sha256WithECDSAEncryption
- - sha384WithECDSAEncryption
- - sha512WithECDSAEncryption
-
-- name: Ensure that the existing certificate belongs to the specified private key
- openssl_certificate:
- path: /etc/ssl/crt/example.com.crt
- privatekey_path: /etc/ssl/private/example.com.pem
- provider: assertonly
-
-- name: Ensure that the existing certificate is still valid at the winter solstice 2017
- openssl_certificate:
- path: /etc/ssl/crt/example.com.crt
- provider: assertonly
- valid_at: 20171221162800Z
-
-- name: Ensure that the existing certificate is still valid 2 weeks (1209600 seconds) from now
- openssl_certificate:
- path: /etc/ssl/crt/example.com.crt
- provider: assertonly
- valid_in: 1209600
-
-- name: Ensure that the existing certificate is only used for digital signatures and encrypting other keys
- openssl_certificate:
- path: /etc/ssl/crt/example.com.crt
- provider: assertonly
- key_usage:
- - digitalSignature
- - keyEncipherment
- key_usage_strict: true
-
-- name: Ensure that the existing certificate can be used for client authentication
- openssl_certificate:
- path: /etc/ssl/crt/example.com.crt
- provider: assertonly
- extended_key_usage:
- - clientAuth
-
-- name: Ensure that the existing certificate can only be used for client authentication and time stamping
- openssl_certificate:
- path: /etc/ssl/crt/example.com.crt
- provider: assertonly
- extended_key_usage:
- - clientAuth
- - 1.3.6.1.5.5.7.3.8
- extended_key_usage_strict: true
-
-- name: Ensure that the existing certificate has a certain domain in its subjectAltName
- openssl_certificate:
- path: /etc/ssl/crt/example.com.crt
- provider: assertonly
- subject_alt_name:
- - www.example.com
- - test.example.com
-'''
-
-RETURN = r'''
-filename:
- description: Path to the generated certificate.
- returned: changed or success
- type: str
- sample: /etc/ssl/crt/www.ansible.com.crt
-backup_file:
- description: Name of backup file created.
- returned: changed and if I(backup) is C(yes)
- type: str
- sample: /path/to/www.ansible.com.crt.2019-03-09@11:22~
-certificate:
- description: The (current or generated) certificate's content.
- returned: if I(state) is C(present) and I(return_content) is C(yes)
- type: str
- version_added: "2.10"
-'''
-
-
-from random import randint
-import abc
-import datetime
-import time
-import os
-import tempfile
-import traceback
-from distutils.version import LooseVersion
-
-from ansible.module_utils import crypto as crypto_utils
-from ansible.module_utils.basic import AnsibleModule, missing_required_lib
-from ansible.module_utils._text import to_native, to_bytes, to_text
-from ansible.module_utils.compat import ipaddress as compat_ipaddress
-from ansible.module_utils.ecs.api import ECSClient, RestOperationException, SessionConfigurationException
-
-MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
-MINIMAL_PYOPENSSL_VERSION = '0.15'
-
-PYOPENSSL_IMP_ERR = None
-try:
- import OpenSSL
- from OpenSSL import crypto
- PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
-except ImportError:
- PYOPENSSL_IMP_ERR = traceback.format_exc()
- PYOPENSSL_FOUND = False
-else:
- PYOPENSSL_FOUND = True
-
-CRYPTOGRAPHY_IMP_ERR = None
-try:
- import cryptography
- from cryptography import x509
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives.serialization import Encoding
- from cryptography.x509 import NameAttribute, Name
- from cryptography.x509.oid import NameOID
- CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
-except ImportError:
- CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
- CRYPTOGRAPHY_FOUND = False
-else:
- CRYPTOGRAPHY_FOUND = True
-
-
-class CertificateError(crypto_utils.OpenSSLObjectError):
- pass
-
-
-class Certificate(crypto_utils.OpenSSLObject):
-
- def __init__(self, module, backend):
- super(Certificate, self).__init__(
- module.params['path'],
- module.params['state'],
- module.params['force'],
- module.check_mode
- )
-
- self.provider = module.params['provider']
- self.privatekey_path = module.params['privatekey_path']
- self.privatekey_content = module.params['privatekey_content']
- if self.privatekey_content is not None:
- self.privatekey_content = self.privatekey_content.encode('utf-8')
- self.privatekey_passphrase = module.params['privatekey_passphrase']
- self.csr_path = module.params['csr_path']
- self.csr_content = module.params['csr_content']
- if self.csr_content is not None:
- self.csr_content = self.csr_content.encode('utf-8')
- self.cert = None
- self.privatekey = None
- self.csr = None
- self.backend = backend
- self.module = module
- self.return_content = module.params['return_content']
-
- # The following are default values which make sure check() works as
- # before if providers do not explicitly change these properties.
- self.create_subject_key_identifier = 'never_create'
- self.create_authority_key_identifier = False
-
- self.backup = module.params['backup']
- self.backup_file = None
-
- def _validate_privatekey(self):
- if self.backend == 'pyopenssl':
- ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_2_METHOD)
- ctx.use_privatekey(self.privatekey)
- ctx.use_certificate(self.cert)
- try:
- ctx.check_privatekey()
- return True
- except OpenSSL.SSL.Error:
- return False
- elif self.backend == 'cryptography':
- return crypto_utils.cryptography_compare_public_keys(self.cert.public_key(), self.privatekey.public_key())
-
- def _validate_csr(self):
- if self.backend == 'pyopenssl':
- # Verify that CSR is signed by certificate's private key
- try:
- self.csr.verify(self.cert.get_pubkey())
- except OpenSSL.crypto.Error:
- return False
- # Check subject
- if self.csr.get_subject() != self.cert.get_subject():
- return False
- # Check extensions
- csr_extensions = self.csr.get_extensions()
- cert_extension_count = self.cert.get_extension_count()
- if len(csr_extensions) != cert_extension_count:
- return False
- for extension_number in range(0, cert_extension_count):
- cert_extension = self.cert.get_extension(extension_number)
- csr_extension = filter(lambda extension: extension.get_short_name() == cert_extension.get_short_name(), csr_extensions)
- if cert_extension.get_data() != list(csr_extension)[0].get_data():
- return False
- return True
- elif self.backend == 'cryptography':
- # Verify that CSR is signed by certificate's private key
- if not self.csr.is_signature_valid:
- return False
- if not crypto_utils.cryptography_compare_public_keys(self.csr.public_key(), self.cert.public_key()):
- return False
- # Check subject
- if self.csr.subject != self.cert.subject:
- return False
- # Check extensions
- cert_exts = list(self.cert.extensions)
- csr_exts = list(self.csr.extensions)
- if self.create_subject_key_identifier != 'never_create':
- # Filter out SubjectKeyIdentifier extension before comparison
- cert_exts = list(filter(lambda x: not isinstance(x.value, x509.SubjectKeyIdentifier), cert_exts))
- csr_exts = list(filter(lambda x: not isinstance(x.value, x509.SubjectKeyIdentifier), csr_exts))
- if self.create_authority_key_identifier:
- # Filter out AuthorityKeyIdentifier extension before comparison
- cert_exts = list(filter(lambda x: not isinstance(x.value, x509.AuthorityKeyIdentifier), cert_exts))
- csr_exts = list(filter(lambda x: not isinstance(x.value, x509.AuthorityKeyIdentifier), csr_exts))
- if len(cert_exts) != len(csr_exts):
- return False
- for cert_ext in cert_exts:
- try:
- csr_ext = self.csr.extensions.get_extension_for_oid(cert_ext.oid)
- if cert_ext != csr_ext:
- return False
- except cryptography.x509.ExtensionNotFound as dummy:
- return False
- return True
-
- def remove(self, module):
- if self.backup:
- self.backup_file = module.backup_local(self.path)
- super(Certificate, self).remove(module)
-
- def check(self, module, perms_required=True):
- """Ensure the resource is in its desired state."""
-
- state_and_perms = super(Certificate, self).check(module, perms_required)
-
- if not state_and_perms:
- return False
-
- try:
- self.cert = crypto_utils.load_certificate(self.path, backend=self.backend)
- except Exception as dummy:
- return False
-
- if self.privatekey_path or self.privatekey_content:
- try:
- self.privatekey = crypto_utils.load_privatekey(
- path=self.privatekey_path,
- content=self.privatekey_content,
- passphrase=self.privatekey_passphrase,
- backend=self.backend
- )
- except crypto_utils.OpenSSLBadPassphraseError as exc:
- raise CertificateError(exc)
- if not self._validate_privatekey():
- return False
-
- if self.csr_path or self.csr_content:
- self.csr = crypto_utils.load_certificate_request(
- path=self.csr_path,
- content=self.csr_content,
- backend=self.backend
- )
- if not self._validate_csr():
- return False
-
- # Check SubjectKeyIdentifier
- if self.backend == 'cryptography' and self.create_subject_key_identifier != 'never_create':
- # Get hold of certificate's SKI
- try:
- ext = self.cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier)
- except cryptography.x509.ExtensionNotFound as dummy:
- return False
- # Get hold of CSR's SKI for 'create_if_not_provided'
- csr_ext = None
- if self.create_subject_key_identifier == 'create_if_not_provided':
- try:
- csr_ext = self.csr.extensions.get_extension_for_class(x509.SubjectKeyIdentifier)
- except cryptography.x509.ExtensionNotFound as dummy:
- pass
- if csr_ext is None:
- # If CSR had no SKI, or we chose to ignore it ('always_create'), compare with created SKI
- if ext.value.digest != x509.SubjectKeyIdentifier.from_public_key(self.cert.public_key()).digest:
- return False
- else:
- # If CSR had SKI and we didn't ignore it ('create_if_not_provided'), compare SKIs
- if ext.value.digest != csr_ext.value.digest:
- return False
-
- return True
-
-
-class CertificateAbsent(Certificate):
- def __init__(self, module):
- super(CertificateAbsent, self).__init__(module, 'cryptography') # backend doesn't matter
-
- def generate(self, module):
- pass
-
- def dump(self, check_mode=False):
- # Use only for absent
-
- result = {
- 'changed': self.changed,
- 'filename': self.path,
- 'privatekey': self.privatekey_path,
- 'csr': self.csr_path
- }
- if self.backup_file:
- result['backup_file'] = self.backup_file
- if self.return_content:
- result['certificate'] = None
-
- return result
-
-
-class SelfSignedCertificateCryptography(Certificate):
- """Generate the self-signed certificate, using the cryptography backend"""
- def __init__(self, module):
- super(SelfSignedCertificateCryptography, self).__init__(module, 'cryptography')
- self.create_subject_key_identifier = module.params['selfsigned_create_subject_key_identifier']
- self.notBefore = crypto_utils.get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before', backend=self.backend)
- self.notAfter = crypto_utils.get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after', backend=self.backend)
- self.digest = crypto_utils.select_message_digest(module.params['selfsigned_digest'])
- self.version = module.params['selfsigned_version']
- self.serial_number = x509.random_serial_number()
-
- if self.csr_content is None and not os.path.exists(self.csr_path):
- raise CertificateError(
- 'The certificate signing request file {0} does not exist'.format(self.csr_path)
- )
- if self.privatekey_content is None and not os.path.exists(self.privatekey_path):
- raise CertificateError(
- 'The private key file {0} does not exist'.format(self.privatekey_path)
- )
-
- self.csr = crypto_utils.load_certificate_request(
- path=self.csr_path,
- content=self.csr_content,
- backend=self.backend
- )
- self._module = module
-
- try:
- self.privatekey = crypto_utils.load_privatekey(
- path=self.privatekey_path,
- content=self.privatekey_content,
- passphrase=self.privatekey_passphrase,
- backend=self.backend
- )
- except crypto_utils.OpenSSLBadPassphraseError as exc:
- module.fail_json(msg=to_native(exc))
-
- if crypto_utils.cryptography_key_needs_digest_for_signing(self.privatekey):
- if self.digest is None:
- raise CertificateError(
- 'The digest %s is not supported with the cryptography backend' % module.params['selfsigned_digest']
- )
- else:
- self.digest = None
-
- def generate(self, module):
- if self.privatekey_content is None and not os.path.exists(self.privatekey_path):
- raise CertificateError(
- 'The private key %s does not exist' % self.privatekey_path
- )
- if self.csr_content is None and not os.path.exists(self.csr_path):
- raise CertificateError(
- 'The certificate signing request file %s does not exist' % self.csr_path
- )
- if not self.check(module, perms_required=False) or self.force:
- try:
- cert_builder = x509.CertificateBuilder()
- cert_builder = cert_builder.subject_name(self.csr.subject)
- cert_builder = cert_builder.issuer_name(self.csr.subject)
- cert_builder = cert_builder.serial_number(self.serial_number)
- cert_builder = cert_builder.not_valid_before(self.notBefore)
- cert_builder = cert_builder.not_valid_after(self.notAfter)
- cert_builder = cert_builder.public_key(self.privatekey.public_key())
- has_ski = False
- for extension in self.csr.extensions:
- if isinstance(extension.value, x509.SubjectKeyIdentifier):
- if self.create_subject_key_identifier == 'always_create':
- continue
- has_ski = True
- cert_builder = cert_builder.add_extension(extension.value, critical=extension.critical)
- if not has_ski and self.create_subject_key_identifier != 'never_create':
- cert_builder = cert_builder.add_extension(
- x509.SubjectKeyIdentifier.from_public_key(self.privatekey.public_key()),
- critical=False
- )
- except ValueError as e:
- raise CertificateError(str(e))
-
- try:
- certificate = cert_builder.sign(
- private_key=self.privatekey, algorithm=self.digest,
- backend=default_backend()
- )
- except TypeError as e:
- if str(e) == 'Algorithm must be a registered hash algorithm.' and self.digest is None:
- module.fail_json(msg='Signing with Ed25519 and Ed448 keys requires cryptography 2.8 or newer.')
- raise
-
- self.cert = certificate
-
- if self.backup:
- self.backup_file = module.backup_local(self.path)
- crypto_utils.write_file(module, certificate.public_bytes(Encoding.PEM))
- self.changed = True
- else:
- self.cert = crypto_utils.load_certificate(self.path, backend=self.backend)
-
- file_args = module.load_file_common_arguments(module.params)
- if module.set_fs_attributes_if_different(file_args, False):
- self.changed = True
-
- def dump(self, check_mode=False):
-
- result = {
- 'changed': self.changed,
- 'filename': self.path,
- 'privatekey': self.privatekey_path,
- 'csr': self.csr_path
- }
- if self.backup_file:
- result['backup_file'] = self.backup_file
- if self.return_content:
- content = crypto_utils.load_file_if_exists(self.path, ignore_errors=True)
- result['certificate'] = content.decode('utf-8') if content else None
-
- if check_mode:
- result.update({
- 'notBefore': self.notBefore.strftime("%Y%m%d%H%M%SZ"),
- 'notAfter': self.notAfter.strftime("%Y%m%d%H%M%SZ"),
- 'serial_number': self.serial_number,
- })
- else:
- result.update({
- 'notBefore': self.cert.not_valid_before.strftime("%Y%m%d%H%M%SZ"),
- 'notAfter': self.cert.not_valid_after.strftime("%Y%m%d%H%M%SZ"),
- 'serial_number': self.cert.serial_number,
- })
-
- return result
-
-
-class SelfSignedCertificate(Certificate):
- """Generate the self-signed certificate."""
-
- def __init__(self, module):
- super(SelfSignedCertificate, self).__init__(module, 'pyopenssl')
- if module.params['selfsigned_create_subject_key_identifier'] != 'create_if_not_provided':
- module.fail_json(msg='selfsigned_create_subject_key_identifier cannot be used with the pyOpenSSL backend!')
- self.notBefore = crypto_utils.get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before', backend=self.backend)
- self.notAfter = crypto_utils.get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after', backend=self.backend)
- self.digest = module.params['selfsigned_digest']
- self.version = module.params['selfsigned_version']
- self.serial_number = randint(1000, 99999)
-
- if self.csr_content is None and not os.path.exists(self.csr_path):
- raise CertificateError(
- 'The certificate signing request file {0} does not exist'.format(self.csr_path)
- )
- if self.privatekey_content is None and not os.path.exists(self.privatekey_path):
- raise CertificateError(
- 'The private key file {0} does not exist'.format(self.privatekey_path)
- )
-
- self.csr = crypto_utils.load_certificate_request(
- path=self.csr_path,
- content=self.csr_content,
- )
- try:
- self.privatekey = crypto_utils.load_privatekey(
- path=self.privatekey_path,
- content=self.privatekey_content,
- passphrase=self.privatekey_passphrase,
- )
- except crypto_utils.OpenSSLBadPassphraseError as exc:
- module.fail_json(msg=str(exc))
-
- def generate(self, module):
-
- if self.privatekey_content is None and not os.path.exists(self.privatekey_path):
- raise CertificateError(
- 'The private key %s does not exist' % self.privatekey_path
- )
-
- if self.csr_content is None and not os.path.exists(self.csr_path):
- raise CertificateError(
- 'The certificate signing request file %s does not exist' % self.csr_path
- )
-
- if not self.check(module, perms_required=False) or self.force:
- cert = crypto.X509()
- cert.set_serial_number(self.serial_number)
- cert.set_notBefore(to_bytes(self.notBefore))
- cert.set_notAfter(to_bytes(self.notAfter))
- cert.set_subject(self.csr.get_subject())
- cert.set_issuer(self.csr.get_subject())
- cert.set_version(self.version - 1)
- cert.set_pubkey(self.csr.get_pubkey())
- cert.add_extensions(self.csr.get_extensions())
-
- cert.sign(self.privatekey, self.digest)
- self.cert = cert
-
- if self.backup:
- self.backup_file = module.backup_local(self.path)
- crypto_utils.write_file(module, crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert))
- self.changed = True
-
- file_args = module.load_file_common_arguments(module.params)
- if module.set_fs_attributes_if_different(file_args, False):
- self.changed = True
-
- def dump(self, check_mode=False):
-
- result = {
- 'changed': self.changed,
- 'filename': self.path,
- 'privatekey': self.privatekey_path,
- 'csr': self.csr_path
- }
- if self.backup_file:
- result['backup_file'] = self.backup_file
- if self.return_content:
- content = crypto_utils.load_file_if_exists(self.path, ignore_errors=True)
- result['certificate'] = content.decode('utf-8') if content else None
-
- if check_mode:
- result.update({
- 'notBefore': self.notBefore,
- 'notAfter': self.notAfter,
- 'serial_number': self.serial_number,
- })
- else:
- result.update({
- 'notBefore': self.cert.get_notBefore(),
- 'notAfter': self.cert.get_notAfter(),
- 'serial_number': self.cert.get_serial_number(),
- })
-
- return result
-
-
-class OwnCACertificateCryptography(Certificate):
- """Generate the own CA certificate. Using the cryptography backend"""
- def __init__(self, module):
- super(OwnCACertificateCryptography, self).__init__(module, 'cryptography')
- self.create_subject_key_identifier = module.params['ownca_create_subject_key_identifier']
- self.create_authority_key_identifier = module.params['ownca_create_authority_key_identifier']
- self.notBefore = crypto_utils.get_relative_time_option(module.params['ownca_not_before'], 'ownca_not_before', backend=self.backend)
- self.notAfter = crypto_utils.get_relative_time_option(module.params['ownca_not_after'], 'ownca_not_after', backend=self.backend)
- self.digest = crypto_utils.select_message_digest(module.params['ownca_digest'])
- self.version = module.params['ownca_version']
- self.serial_number = x509.random_serial_number()
- self.ca_cert_path = module.params['ownca_path']
- self.ca_cert_content = module.params['ownca_content']
- if self.ca_cert_content is not None:
- self.ca_cert_content = self.ca_cert_content.encode('utf-8')
- self.ca_privatekey_path = module.params['ownca_privatekey_path']
- self.ca_privatekey_content = module.params['ownca_privatekey_content']
- if self.ca_privatekey_content is not None:
- self.ca_privatekey_content = self.ca_privatekey_content.encode('utf-8')
- self.ca_privatekey_passphrase = module.params['ownca_privatekey_passphrase']
-
- if self.csr_content is None and not os.path.exists(self.csr_path):
- raise CertificateError(
- 'The certificate signing request file {0} does not exist'.format(self.csr_path)
- )
- if self.ca_cert_content is None and not os.path.exists(self.ca_cert_path):
- raise CertificateError(
- 'The CA certificate file {0} does not exist'.format(self.ca_cert_path)
- )
- if self.ca_privatekey_content is None and not os.path.exists(self.ca_privatekey_path):
- raise CertificateError(
- 'The CA private key file {0} does not exist'.format(self.ca_privatekey_path)
- )
-
- self.csr = crypto_utils.load_certificate_request(
- path=self.csr_path,
- content=self.csr_content,
- backend=self.backend
- )
- self.ca_cert = crypto_utils.load_certificate(
- path=self.ca_cert_path,
- content=self.ca_cert_content,
- backend=self.backend
- )
- try:
- self.ca_private_key = crypto_utils.load_privatekey(
- path=self.ca_privatekey_path,
- content=self.ca_privatekey_content,
- passphrase=self.ca_privatekey_passphrase,
- backend=self.backend
- )
- except crypto_utils.OpenSSLBadPassphraseError as exc:
- module.fail_json(msg=str(exc))
-
- if crypto_utils.cryptography_key_needs_digest_for_signing(self.ca_private_key):
- if self.digest is None:
- raise CertificateError(
- 'The digest %s is not supported with the cryptography backend' % module.params['ownca_digest']
- )
- else:
- self.digest = None
-
- def generate(self, module):
-
- if self.ca_cert_content is None and not os.path.exists(self.ca_cert_path):
- raise CertificateError(
- 'The CA certificate %s does not exist' % self.ca_cert_path
- )
-
- if self.ca_privatekey_content is None and not os.path.exists(self.ca_privatekey_path):
- raise CertificateError(
- 'The CA private key %s does not exist' % self.ca_privatekey_path
- )
-
- if self.csr_content is None and not os.path.exists(self.csr_path):
- raise CertificateError(
- 'The certificate signing request file %s does not exist' % self.csr_path
- )
-
- if not self.check(module, perms_required=False) or self.force:
- cert_builder = x509.CertificateBuilder()
- cert_builder = cert_builder.subject_name(self.csr.subject)
- cert_builder = cert_builder.issuer_name(self.ca_cert.subject)
- cert_builder = cert_builder.serial_number(self.serial_number)
- cert_builder = cert_builder.not_valid_before(self.notBefore)
- cert_builder = cert_builder.not_valid_after(self.notAfter)
- cert_builder = cert_builder.public_key(self.csr.public_key())
- has_ski = False
- for extension in self.csr.extensions:
- if isinstance(extension.value, x509.SubjectKeyIdentifier):
- if self.create_subject_key_identifier == 'always_create':
- continue
- has_ski = True
- if self.create_authority_key_identifier and isinstance(extension.value, x509.AuthorityKeyIdentifier):
- continue
- cert_builder = cert_builder.add_extension(extension.value, critical=extension.critical)
- if not has_ski and self.create_subject_key_identifier != 'never_create':
- cert_builder = cert_builder.add_extension(
- x509.SubjectKeyIdentifier.from_public_key(self.csr.public_key()),
- critical=False
- )
- if self.create_authority_key_identifier:
- try:
- ext = self.ca_cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier)
- cert_builder = cert_builder.add_extension(
- x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(ext.value)
- if CRYPTOGRAPHY_VERSION >= LooseVersion('2.7') else
- x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(ext),
- critical=False
- )
- except cryptography.x509.ExtensionNotFound:
- cert_builder = cert_builder.add_extension(
- x509.AuthorityKeyIdentifier.from_issuer_public_key(self.ca_cert.public_key()),
- critical=False
- )
-
- try:
- certificate = cert_builder.sign(
- private_key=self.ca_private_key, algorithm=self.digest,
- backend=default_backend()
- )
- except TypeError as e:
- if str(e) == 'Algorithm must be a registered hash algorithm.' and self.digest is None:
- module.fail_json(msg='Signing with Ed25519 and Ed448 keys requires cryptography 2.8 or newer.')
- raise
-
- self.cert = certificate
-
- if self.backup:
- self.backup_file = module.backup_local(self.path)
- crypto_utils.write_file(module, certificate.public_bytes(Encoding.PEM))
- self.changed = True
- else:
- self.cert = crypto_utils.load_certificate(self.path, backend=self.backend)
-
- file_args = module.load_file_common_arguments(module.params)
- if module.set_fs_attributes_if_different(file_args, False):
- self.changed = True
-
- def check(self, module, perms_required=True):
- """Ensure the resource is in its desired state."""
-
- if not super(OwnCACertificateCryptography, self).check(module, perms_required):
- return False
-
- # Check AuthorityKeyIdentifier
- if self.create_authority_key_identifier:
- try:
- ext = self.ca_cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier)
- expected_ext = (
- x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(ext.value)
- if CRYPTOGRAPHY_VERSION >= LooseVersion('2.7') else
- x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(ext)
- )
- except cryptography.x509.ExtensionNotFound:
- expected_ext = x509.AuthorityKeyIdentifier.from_issuer_public_key(self.ca_cert.public_key())
- try:
- ext = self.cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
- if ext.value != expected_ext:
- return False
- except cryptography.x509.ExtensionNotFound as dummy:
- return False
-
- return True
-
- def dump(self, check_mode=False):
-
- result = {
- 'changed': self.changed,
- 'filename': self.path,
- 'privatekey': self.privatekey_path,
- 'csr': self.csr_path,
- 'ca_cert': self.ca_cert_path,
- 'ca_privatekey': self.ca_privatekey_path
- }
- if self.backup_file:
- result['backup_file'] = self.backup_file
- if self.return_content:
- content = crypto_utils.load_file_if_exists(self.path, ignore_errors=True)
- result['certificate'] = content.decode('utf-8') if content else None
-
- if check_mode:
- result.update({
- 'notBefore': self.notBefore.strftime("%Y%m%d%H%M%SZ"),
- 'notAfter': self.notAfter.strftime("%Y%m%d%H%M%SZ"),
- 'serial_number': self.serial_number,
- })
- else:
- result.update({
- 'notBefore': self.cert.not_valid_before.strftime("%Y%m%d%H%M%SZ"),
- 'notAfter': self.cert.not_valid_after.strftime("%Y%m%d%H%M%SZ"),
- 'serial_number': self.cert.serial_number,
- })
-
- return result
-
-
-class OwnCACertificate(Certificate):
- """Generate the own CA certificate."""
-
- def __init__(self, module):
- super(OwnCACertificate, self).__init__(module, 'pyopenssl')
- self.notBefore = crypto_utils.get_relative_time_option(module.params['ownca_not_before'], 'ownca_not_before', backend=self.backend)
- self.notAfter = crypto_utils.get_relative_time_option(module.params['ownca_not_after'], 'ownca_not_after', backend=self.backend)
- self.digest = module.params['ownca_digest']
- self.version = module.params['ownca_version']
- self.serial_number = randint(1000, 99999)
- if module.params['ownca_create_subject_key_identifier'] != 'create_if_not_provided':
- module.fail_json(msg='ownca_create_subject_key_identifier cannot be used with the pyOpenSSL backend!')
- if module.params['ownca_create_authority_key_identifier']:
- module.warn('ownca_create_authority_key_identifier is ignored by the pyOpenSSL backend!')
- self.ca_cert_path = module.params['ownca_path']
- self.ca_cert_content = module.params['ownca_content']
- if self.ca_cert_content is not None:
- self.ca_cert_content = self.ca_cert_content.encode('utf-8')
- self.ca_privatekey_path = module.params['ownca_privatekey_path']
- self.ca_privatekey_content = module.params['ownca_privatekey_content']
- if self.ca_privatekey_content is not None:
- self.ca_privatekey_content = self.ca_privatekey_content.encode('utf-8')
- self.ca_privatekey_passphrase = module.params['ownca_privatekey_passphrase']
-
- if self.csr_content is None and not os.path.exists(self.csr_path):
- raise CertificateError(
- 'The certificate signing request file {0} does not exist'.format(self.csr_path)
- )
- if self.ca_cert_content is None and not os.path.exists(self.ca_cert_path):
- raise CertificateError(
- 'The CA certificate file {0} does not exist'.format(self.ca_cert_path)
- )
- if self.ca_privatekey_content is None and not os.path.exists(self.ca_privatekey_path):
- raise CertificateError(
- 'The CA private key file {0} does not exist'.format(self.ca_privatekey_path)
- )
-
- self.csr = crypto_utils.load_certificate_request(
- path=self.csr_path,
- content=self.csr_content,
- )
- self.ca_cert = crypto_utils.load_certificate(
- path=self.ca_cert_path,
- content=self.ca_cert_content,
- )
- try:
- self.ca_privatekey = crypto_utils.load_privatekey(
- path=self.ca_privatekey_path,
- content=self.ca_privatekey_content,
- passphrase=self.ca_privatekey_passphrase
- )
- except crypto_utils.OpenSSLBadPassphraseError as exc:
- module.fail_json(msg=str(exc))
-
- def generate(self, module):
-
- if self.ca_cert_content is None and not os.path.exists(self.ca_cert_path):
- raise CertificateError(
- 'The CA certificate %s does not exist' % self.ca_cert_path
- )
-
- if self.ca_privatekey_content is None and not os.path.exists(self.ca_privatekey_path):
- raise CertificateError(
- 'The CA private key %s does not exist' % self.ca_privatekey_path
- )
-
- if self.csr_content is None and not os.path.exists(self.csr_path):
- raise CertificateError(
- 'The certificate signing request file %s does not exist' % self.csr_path
- )
-
- if not self.check(module, perms_required=False) or self.force:
- cert = crypto.X509()
- cert.set_serial_number(self.serial_number)
- cert.set_notBefore(to_bytes(self.notBefore))
- cert.set_notAfter(to_bytes(self.notAfter))
- cert.set_subject(self.csr.get_subject())
- cert.set_issuer(self.ca_cert.get_subject())
- cert.set_version(self.version - 1)
- cert.set_pubkey(self.csr.get_pubkey())
- cert.add_extensions(self.csr.get_extensions())
-
- cert.sign(self.ca_privatekey, self.digest)
- self.cert = cert
-
- if self.backup:
- self.backup_file = module.backup_local(self.path)
- crypto_utils.write_file(module, crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert))
- self.changed = True
-
- file_args = module.load_file_common_arguments(module.params)
- if module.set_fs_attributes_if_different(file_args, False):
- self.changed = True
-
- def dump(self, check_mode=False):
-
- result = {
- 'changed': self.changed,
- 'filename': self.path,
- 'privatekey': self.privatekey_path,
- 'csr': self.csr_path,
- 'ca_cert': self.ca_cert_path,
- 'ca_privatekey': self.ca_privatekey_path
- }
- if self.backup_file:
- result['backup_file'] = self.backup_file
- if self.return_content:
- content = crypto_utils.load_file_if_exists(self.path, ignore_errors=True)
- result['certificate'] = content.decode('utf-8') if content else None
-
- if check_mode:
- result.update({
- 'notBefore': self.notBefore,
- 'notAfter': self.notAfter,
- 'serial_number': self.serial_number,
- })
- else:
- result.update({
- 'notBefore': self.cert.get_notBefore(),
- 'notAfter': self.cert.get_notAfter(),
- 'serial_number': self.cert.get_serial_number(),
- })
-
- return result
-
-
-def compare_sets(subset, superset, equality=False):
- if equality:
- return set(subset) == set(superset)
- else:
- return all(x in superset for x in subset)
-
-
-def compare_dicts(subset, superset, equality=False):
- if equality:
- return subset == superset
- else:
- return all(superset.get(x) == v for x, v in subset.items())
-
-
-NO_EXTENSION = 'no extension'
-
-
-class AssertOnlyCertificateBase(Certificate):
-
- def __init__(self, module, backend):
- super(AssertOnlyCertificateBase, self).__init__(module, backend)
-
- self.signature_algorithms = module.params['signature_algorithms']
- if module.params['subject']:
- self.subject = crypto_utils.parse_name_field(module.params['subject'])
- else:
- self.subject = []
- self.subject_strict = module.params['subject_strict']
- if module.params['issuer']:
- self.issuer = crypto_utils.parse_name_field(module.params['issuer'])
- else:
- self.issuer = []
- self.issuer_strict = module.params['issuer_strict']
- self.has_expired = module.params['has_expired']
- self.version = module.params['version']
- self.key_usage = module.params['key_usage']
- self.key_usage_strict = module.params['key_usage_strict']
- self.extended_key_usage = module.params['extended_key_usage']
- self.extended_key_usage_strict = module.params['extended_key_usage_strict']
- self.subject_alt_name = module.params['subject_alt_name']
- self.subject_alt_name_strict = module.params['subject_alt_name_strict']
- self.not_before = module.params['not_before']
- self.not_after = module.params['not_after']
- self.valid_at = module.params['valid_at']
- self.invalid_at = module.params['invalid_at']
- self.valid_in = module.params['valid_in']
- if self.valid_in and not self.valid_in.startswith("+") and not self.valid_in.startswith("-"):
- try:
- int(self.valid_in)
- except ValueError:
- module.fail_json(msg='The supplied value for "valid_in" (%s) is not an integer or a valid timespec' % self.valid_in)
- self.valid_in = "+" + self.valid_in + "s"
-
- # Load objects
- self.cert = crypto_utils.load_certificate(self.path, backend=self.backend)
- if self.privatekey_path is not None or self.privatekey_content is not None:
- try:
- self.privatekey = crypto_utils.load_privatekey(
- path=self.privatekey_path,
- content=self.privatekey_content,
- passphrase=self.privatekey_passphrase,
- backend=self.backend
- )
- except crypto_utils.OpenSSLBadPassphraseError as exc:
- raise CertificateError(exc)
- if self.csr_path is not None or self.csr_content is not None:
- self.csr = crypto_utils.load_certificate_request(
- path=self.csr_path,
- content=self.csr_content,
- backend=self.backend
- )
-
- @abc.abstractmethod
- def _validate_privatekey(self):
- pass
-
- @abc.abstractmethod
- def _validate_csr_signature(self):
- pass
-
- @abc.abstractmethod
- def _validate_csr_subject(self):
- pass
-
- @abc.abstractmethod
- def _validate_csr_extensions(self):
- pass
-
- @abc.abstractmethod
- def _validate_signature_algorithms(self):
- pass
-
- @abc.abstractmethod
- def _validate_subject(self):
- pass
-
- @abc.abstractmethod
- def _validate_issuer(self):
- pass
-
- @abc.abstractmethod
- def _validate_has_expired(self):
- pass
-
- @abc.abstractmethod
- def _validate_version(self):
- pass
-
- @abc.abstractmethod
- def _validate_key_usage(self):
- pass
-
- @abc.abstractmethod
- def _validate_extended_key_usage(self):
- pass
-
- @abc.abstractmethod
- def _validate_subject_alt_name(self):
- pass
-
- @abc.abstractmethod
- def _validate_not_before(self):
- pass
-
- @abc.abstractmethod
- def _validate_not_after(self):
- pass
-
- @abc.abstractmethod
- def _validate_valid_at(self):
- pass
-
- @abc.abstractmethod
- def _validate_invalid_at(self):
- pass
-
- @abc.abstractmethod
- def _validate_valid_in(self):
- pass
-
- def assertonly(self, module):
- messages = []
- if self.privatekey_path is not None or self.privatekey_content is not None:
- if not self._validate_privatekey():
- messages.append(
- 'Certificate %s and private key %s do not match' %
- (self.path, self.privatekey_path or '(provided in module options)')
- )
-
- if self.csr_path is not None or self.csr_content is not None:
- if not self._validate_csr_signature():
- messages.append(
- 'Certificate %s and CSR %s do not match: private key mismatch' %
- (self.path, self.csr_path or '(provided in module options)')
- )
- if not self._validate_csr_subject():
- messages.append(
- 'Certificate %s and CSR %s do not match: subject mismatch' %
- (self.path, self.csr_path or '(provided in module options)')
- )
- if not self._validate_csr_extensions():
- messages.append(
- 'Certificate %s and CSR %s do not match: extensions mismatch' %
- (self.path, self.csr_path or '(provided in module options)')
- )
-
- if self.signature_algorithms is not None:
- wrong_alg = self._validate_signature_algorithms()
- if wrong_alg:
- messages.append(
- 'Invalid signature algorithm (got %s, expected one of %s)' %
- (wrong_alg, self.signature_algorithms)
- )
-
- if self.subject is not None:
- failure = self._validate_subject()
- if failure:
- dummy, cert_subject = failure
- messages.append(
- 'Invalid subject component (got %s, expected all of %s to be present)' %
- (cert_subject, self.subject)
- )
-
- if self.issuer is not None:
- failure = self._validate_issuer()
- if failure:
- dummy, cert_issuer = failure
- messages.append(
- 'Invalid issuer component (got %s, expected all of %s to be present)' % (cert_issuer, self.issuer)
- )
-
- if self.has_expired is not None:
- cert_expired = self._validate_has_expired()
- if cert_expired != self.has_expired:
- messages.append(
- 'Certificate expiration check failed (certificate expiration is %s, expected %s)' %
- (cert_expired, self.has_expired)
- )
-
- if self.version is not None:
- cert_version = self._validate_version()
- if cert_version != self.version:
- messages.append(
- 'Invalid certificate version number (got %s, expected %s)' %
- (cert_version, self.version)
- )
-
- if self.key_usage is not None:
- failure = self._validate_key_usage()
- if failure == NO_EXTENSION:
- messages.append('Found no keyUsage extension')
- elif failure:
- dummy, cert_key_usage = failure
- messages.append(
- 'Invalid keyUsage components (got %s, expected all of %s to be present)' %
- (cert_key_usage, self.key_usage)
- )
-
- if self.extended_key_usage is not None:
- failure = self._validate_extended_key_usage()
- if failure == NO_EXTENSION:
- messages.append('Found no extendedKeyUsage extension')
- elif failure:
- dummy, ext_cert_key_usage = failure
- messages.append(
- 'Invalid extendedKeyUsage component (got %s, expected all of %s to be present)' % (ext_cert_key_usage, self.extended_key_usage)
- )
-
- if self.subject_alt_name is not None:
- failure = self._validate_subject_alt_name()
- if failure == NO_EXTENSION:
- messages.append('Found no subjectAltName extension')
- elif failure:
- dummy, cert_san = failure
- messages.append(
- 'Invalid subjectAltName component (got %s, expected all of %s to be present)' %
- (cert_san, self.subject_alt_name)
- )
-
- if self.not_before is not None:
- cert_not_valid_before = self._validate_not_before()
- if cert_not_valid_before != crypto_utils.get_relative_time_option(self.not_before, 'not_before', backend=self.backend):
- messages.append(
- 'Invalid not_before component (got %s, expected %s to be present)' %
- (cert_not_valid_before, self.not_before)
- )
-
- if self.not_after is not None:
- cert_not_valid_after = self._validate_not_after()
- if cert_not_valid_after != crypto_utils.get_relative_time_option(self.not_after, 'not_after', backend=self.backend):
- messages.append(
- 'Invalid not_after component (got %s, expected %s to be present)' %
- (cert_not_valid_after, self.not_after)
- )
-
- if self.valid_at is not None:
- not_before, valid_at, not_after = self._validate_valid_at()
- if not (not_before <= valid_at <= not_after):
- messages.append(
- 'Certificate is not valid for the specified date (%s) - not_before: %s - not_after: %s' %
- (self.valid_at, not_before, not_after)
- )
-
- if self.invalid_at is not None:
- not_before, invalid_at, not_after = self._validate_invalid_at()
- if not_before <= invalid_at <= not_after:
- messages.append(
- 'Certificate is not invalid for the specified date (%s) - not_before: %s - not_after: %s' %
- (self.invalid_at, not_before, not_after)
- )
-
- if self.valid_in is not None:
- not_before, valid_in, not_after = self._validate_valid_in()
- if not not_before <= valid_in <= not_after:
- messages.append(
- 'Certificate is not valid in %s from now (that would be %s) - not_before: %s - not_after: %s' %
- (self.valid_in, valid_in, not_before, not_after)
- )
- return messages
-
- def generate(self, module):
- """Don't generate anything - only assert"""
- messages = self.assertonly(module)
- if messages:
- module.fail_json(msg=' | '.join(messages))
-
- def check(self, module, perms_required=False):
- """Ensure the resource is in its desired state."""
- messages = self.assertonly(module)
- return len(messages) == 0
-
- def dump(self, check_mode=False):
- result = {
- 'changed': self.changed,
- 'filename': self.path,
- 'privatekey': self.privatekey_path,
- 'csr': self.csr_path,
- }
- if self.return_content:
- content = crypto_utils.load_file_if_exists(self.path, ignore_errors=True)
- result['certificate'] = content.decode('utf-8') if content else None
- return result
-
-
-class AssertOnlyCertificateCryptography(AssertOnlyCertificateBase):
- """Validate the supplied cert, using the cryptography backend"""
- def __init__(self, module):
- super(AssertOnlyCertificateCryptography, self).__init__(module, 'cryptography')
-
- def _validate_privatekey(self):
- return crypto_utils.cryptography_compare_public_keys(self.cert.public_key(), self.privatekey.public_key())
-
- def _validate_csr_signature(self):
- if not self.csr.is_signature_valid:
- return False
- return crypto_utils.cryptography_compare_public_keys(self.csr.public_key(), self.cert.public_key())
-
- def _validate_csr_subject(self):
- return self.csr.subject == self.cert.subject
-
- def _validate_csr_extensions(self):
- cert_exts = self.cert.extensions
- csr_exts = self.csr.extensions
- if len(cert_exts) != len(csr_exts):
- return False
- for cert_ext in cert_exts:
- try:
- csr_ext = csr_exts.get_extension_for_oid(cert_ext.oid)
- if cert_ext != csr_ext:
- return False
- except cryptography.x509.ExtensionNotFound as dummy:
- return False
- return True
-
- def _validate_signature_algorithms(self):
- if self.cert.signature_algorithm_oid._name not in self.signature_algorithms:
- return self.cert.signature_algorithm_oid._name
-
- def _validate_subject(self):
- expected_subject = Name([NameAttribute(oid=crypto_utils.cryptography_name_to_oid(sub[0]), value=to_text(sub[1]))
- for sub in self.subject])
- cert_subject = self.cert.subject
- if not compare_sets(expected_subject, cert_subject, self.subject_strict):
- return expected_subject, cert_subject
-
- def _validate_issuer(self):
- expected_issuer = Name([NameAttribute(oid=crypto_utils.cryptography_name_to_oid(iss[0]), value=to_text(iss[1]))
- for iss in self.issuer])
- cert_issuer = self.cert.issuer
- if not compare_sets(expected_issuer, cert_issuer, self.issuer_strict):
- return self.issuer, cert_issuer
-
- def _validate_has_expired(self):
- cert_not_after = self.cert.not_valid_after
- cert_expired = cert_not_after < datetime.datetime.utcnow()
- return cert_expired
-
- def _validate_version(self):
- if self.cert.version == x509.Version.v1:
- return 1
- if self.cert.version == x509.Version.v3:
- return 3
- return "unknown"
-
- def _validate_key_usage(self):
- try:
- current_key_usage = self.cert.extensions.get_extension_for_class(x509.KeyUsage).value
- test_key_usage = dict(
- digital_signature=current_key_usage.digital_signature,
- content_commitment=current_key_usage.content_commitment,
- key_encipherment=current_key_usage.key_encipherment,
- data_encipherment=current_key_usage.data_encipherment,
- key_agreement=current_key_usage.key_agreement,
- key_cert_sign=current_key_usage.key_cert_sign,
- crl_sign=current_key_usage.crl_sign,
- encipher_only=False,
- decipher_only=False
- )
- if test_key_usage['key_agreement']:
- test_key_usage.update(dict(
- encipher_only=current_key_usage.encipher_only,
- decipher_only=current_key_usage.decipher_only
- ))
-
- key_usages = crypto_utils.cryptography_parse_key_usage_params(self.key_usage)
- if not compare_dicts(key_usages, test_key_usage, self.key_usage_strict):
- return self.key_usage, [k for k, v in test_key_usage.items() if v is True]
-
- except cryptography.x509.ExtensionNotFound:
- # This is only bad if the user specified a non-empty list
- if self.key_usage:
- return NO_EXTENSION
-
- def _validate_extended_key_usage(self):
- try:
- current_ext_keyusage = self.cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage).value
- usages = [crypto_utils.cryptography_name_to_oid(usage) for usage in self.extended_key_usage]
- expected_ext_keyusage = x509.ExtendedKeyUsage(usages)
- if not compare_sets(expected_ext_keyusage, current_ext_keyusage, self.extended_key_usage_strict):
- return [eku.value for eku in expected_ext_keyusage], [eku.value for eku in current_ext_keyusage]
-
- except cryptography.x509.ExtensionNotFound:
- # This is only bad if the user specified a non-empty list
- if self.extended_key_usage:
- return NO_EXTENSION
-
- def _validate_subject_alt_name(self):
- try:
- current_san = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value
- expected_san = [crypto_utils.cryptography_get_name(san) for san in self.subject_alt_name]
- if not compare_sets(expected_san, current_san, self.subject_alt_name_strict):
- return self.subject_alt_name, current_san
- except cryptography.x509.ExtensionNotFound:
- # This is only bad if the user specified a non-empty list
- if self.subject_alt_name:
- return NO_EXTENSION
-
- def _validate_not_before(self):
- return self.cert.not_valid_before
-
- def _validate_not_after(self):
- return self.cert.not_valid_after
-
- def _validate_valid_at(self):
- rt = crypto_utils.get_relative_time_option(self.valid_at, 'valid_at', backend=self.backend)
- return self.cert.not_valid_before, rt, self.cert.not_valid_after
-
- def _validate_invalid_at(self):
- rt = crypto_utils.get_relative_time_option(self.invalid_at, 'invalid_at', backend=self.backend)
- return self.cert.not_valid_before, rt, self.cert.not_valid_after
-
- def _validate_valid_in(self):
- valid_in_date = crypto_utils.get_relative_time_option(self.valid_in, "valid_in", backend=self.backend)
- return self.cert.not_valid_before, valid_in_date, self.cert.not_valid_after
-
-
-class AssertOnlyCertificate(AssertOnlyCertificateBase):
- """validate the supplied certificate."""
-
- def __init__(self, module):
- super(AssertOnlyCertificate, self).__init__(module, 'pyopenssl')
-
- # Ensure inputs are properly sanitized before comparison.
- for param in ['signature_algorithms', 'key_usage', 'extended_key_usage',
- 'subject_alt_name', 'subject', 'issuer', 'not_before',
- 'not_after', 'valid_at', 'invalid_at']:
- attr = getattr(self, param)
- if isinstance(attr, list) and attr:
- if isinstance(attr[0], str):
- setattr(self, param, [to_bytes(item) for item in attr])
- elif isinstance(attr[0], tuple):
- setattr(self, param, [(to_bytes(item[0]), to_bytes(item[1])) for item in attr])
- elif isinstance(attr, tuple):
- setattr(self, param, dict((to_bytes(k), to_bytes(v)) for (k, v) in attr.items()))
- elif isinstance(attr, dict):
- setattr(self, param, dict((to_bytes(k), to_bytes(v)) for (k, v) in attr.items()))
- elif isinstance(attr, str):
- setattr(self, param, to_bytes(attr))
-
- def _validate_privatekey(self):
- ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_2_METHOD)
- ctx.use_privatekey(self.privatekey)
- ctx.use_certificate(self.cert)
- try:
- ctx.check_privatekey()
- return True
- except OpenSSL.SSL.Error:
- return False
-
- def _validate_csr_signature(self):
- try:
- self.csr.verify(self.cert.get_pubkey())
- except OpenSSL.crypto.Error:
- return False
-
- def _validate_csr_subject(self):
- if self.csr.get_subject() != self.cert.get_subject():
- return False
-
- def _validate_csr_extensions(self):
- csr_extensions = self.csr.get_extensions()
- cert_extension_count = self.cert.get_extension_count()
- if len(csr_extensions) != cert_extension_count:
- return False
- for extension_number in range(0, cert_extension_count):
- cert_extension = self.cert.get_extension(extension_number)
- csr_extension = filter(lambda extension: extension.get_short_name() == cert_extension.get_short_name(), csr_extensions)
- if cert_extension.get_data() != list(csr_extension)[0].get_data():
- return False
- return True
-
- def _validate_signature_algorithms(self):
- if self.cert.get_signature_algorithm() not in self.signature_algorithms:
- return self.cert.get_signature_algorithm()
-
- def _validate_subject(self):
- expected_subject = [(OpenSSL._util.lib.OBJ_txt2nid(sub[0]), sub[1]) for sub in self.subject]
- cert_subject = self.cert.get_subject().get_components()
- current_subject = [(OpenSSL._util.lib.OBJ_txt2nid(sub[0]), sub[1]) for sub in cert_subject]
- if not compare_sets(expected_subject, current_subject, self.subject_strict):
- return expected_subject, current_subject
-
- def _validate_issuer(self):
- expected_issuer = [(OpenSSL._util.lib.OBJ_txt2nid(iss[0]), iss[1]) for iss in self.issuer]
- cert_issuer = self.cert.get_issuer().get_components()
- current_issuer = [(OpenSSL._util.lib.OBJ_txt2nid(iss[0]), iss[1]) for iss in cert_issuer]
- if not compare_sets(expected_issuer, current_issuer, self.issuer_strict):
- return self.issuer, cert_issuer
-
- def _validate_has_expired(self):
- # The following 3 lines are the same as the current PyOpenSSL code for cert.has_expired().
- # Older version of PyOpenSSL have a buggy implementation,
- # to avoid issues with those we added the code from a more recent release here.
-
- time_string = to_native(self.cert.get_notAfter())
- not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
- cert_expired = not_after < datetime.datetime.utcnow()
- return cert_expired
-
- def _validate_version(self):
- # Version numbers in certs are off by one:
- # v1: 0, v2: 1, v3: 2 ...
- return self.cert.get_version() + 1
-
- def _validate_key_usage(self):
- found = False
- for extension_idx in range(0, self.cert.get_extension_count()):
- extension = self.cert.get_extension(extension_idx)
- if extension.get_short_name() == b'keyUsage':
- found = True
- expected_extension = crypto.X509Extension(b"keyUsage", False, b', '.join(self.key_usage))
- key_usage = [usage.strip() for usage in to_text(expected_extension, errors='surrogate_or_strict').split(',')]
- current_ku = [usage.strip() for usage in to_text(extension, errors='surrogate_or_strict').split(',')]
- if not compare_sets(key_usage, current_ku, self.key_usage_strict):
- return self.key_usage, str(extension).split(', ')
- if not found:
- # This is only bad if the user specified a non-empty list
- if self.key_usage:
- return NO_EXTENSION
-
- def _validate_extended_key_usage(self):
- found = False
- for extension_idx in range(0, self.cert.get_extension_count()):
- extension = self.cert.get_extension(extension_idx)
- if extension.get_short_name() == b'extendedKeyUsage':
- found = True
- extKeyUsage = [OpenSSL._util.lib.OBJ_txt2nid(keyUsage) for keyUsage in self.extended_key_usage]
- current_xku = [OpenSSL._util.lib.OBJ_txt2nid(usage.strip()) for usage in
- to_bytes(extension, errors='surrogate_or_strict').split(b',')]
- if not compare_sets(extKeyUsage, current_xku, self.extended_key_usage_strict):
- return self.extended_key_usage, str(extension).split(', ')
- if not found:
- # This is only bad if the user specified a non-empty list
- if self.extended_key_usage:
- return NO_EXTENSION
-
- def _normalize_san(self, san):
- # Apparently OpenSSL returns 'IP address' not 'IP' as specifier when converting the subjectAltName to string
- # although it won't accept this specifier when generating the CSR. (https://github.com/openssl/openssl/issues/4004)
- if san.startswith('IP Address:'):
- san = 'IP:' + san[len('IP Address:'):]
- if san.startswith('IP:'):
- ip = compat_ipaddress.ip_address(san[3:])
- san = 'IP:{0}'.format(ip.compressed)
- return san
-
- def _validate_subject_alt_name(self):
- found = False
- for extension_idx in range(0, self.cert.get_extension_count()):
- extension = self.cert.get_extension(extension_idx)
- if extension.get_short_name() == b'subjectAltName':
- found = True
- l_altnames = [self._normalize_san(altname.strip()) for altname in
- to_text(extension, errors='surrogate_or_strict').split(', ')]
- sans = [self._normalize_san(to_text(san, errors='surrogate_or_strict')) for san in self.subject_alt_name]
- if not compare_sets(sans, l_altnames, self.subject_alt_name_strict):
- return self.subject_alt_name, l_altnames
- if not found:
- # This is only bad if the user specified a non-empty list
- if self.subject_alt_name:
- return NO_EXTENSION
-
- def _validate_not_before(self):
- return self.cert.get_notBefore()
-
- def _validate_not_after(self):
- return self.cert.get_notAfter()
-
- def _validate_valid_at(self):
- rt = crypto_utils.get_relative_time_option(self.valid_at, "valid_at", backend=self.backend)
- rt = to_bytes(rt, errors='surrogate_or_strict')
- return self.cert.get_notBefore(), rt, self.cert.get_notAfter()
-
- def _validate_invalid_at(self):
- rt = crypto_utils.get_relative_time_option(self.invalid_at, "invalid_at", backend=self.backend)
- rt = to_bytes(rt, errors='surrogate_or_strict')
- return self.cert.get_notBefore(), rt, self.cert.get_notAfter()
-
- def _validate_valid_in(self):
- valid_in_asn1 = crypto_utils.get_relative_time_option(self.valid_in, "valid_in", backend=self.backend)
- valid_in_date = to_bytes(valid_in_asn1, errors='surrogate_or_strict')
- return self.cert.get_notBefore(), valid_in_date, self.cert.get_notAfter()
-
-
-class EntrustCertificate(Certificate):
- """Retrieve a certificate using Entrust (ECS)."""
-
- def __init__(self, module, backend):
- super(EntrustCertificate, self).__init__(module, backend)
- self.trackingId = None
- self.notAfter = crypto_utils.get_relative_time_option(module.params['entrust_not_after'], 'entrust_not_after', backend=self.backend)
-
- if self.csr_content is None or not os.path.exists(self.csr_path):
- raise CertificateError(
- 'The certificate signing request file {0} does not exist'.format(self.csr_path)
- )
-
- self.csr = crypto_utils.load_certificate_request(
- path=self.csr_path,
- content=self.csr_content,
- backend=self.backend,
- )
-
- # ECS API defaults to using the validated organization tied to the account.
- # We want to always force behavior of trying to use the organization provided in the CSR.
- # To that end we need to parse out the organization from the CSR.
- self.csr_org = None
- if self.backend == 'pyopenssl':
- csr_subject = self.csr.get_subject()
- csr_subject_components = csr_subject.get_components()
- for k, v in csr_subject_components:
- if k.upper() == 'O':
- # Entrust does not support multiple validated organizations in a single certificate
- if self.csr_org is not None:
- module.fail_json(msg=("Entrust provider does not currently support multiple validated organizations. Multiple organizations found in "
- "Subject DN: '{0}'. ".format(csr_subject)))
- else:
- self.csr_org = v
- elif self.backend == 'cryptography':
- csr_subject_orgs = self.csr.subject.get_attributes_for_oid(NameOID.ORGANIZATION_NAME)
- if len(csr_subject_orgs) == 1:
- self.csr_org = csr_subject_orgs[0].value
- elif len(csr_subject_orgs) > 1:
- module.fail_json(msg=("Entrust provider does not currently support multiple validated organizations. Multiple organizations found in "
- "Subject DN: '{0}'. ".format(self.csr.subject)))
- # If no organization in the CSR, explicitly tell ECS that it should be blank in issued cert, not defaulted to
- # organization tied to the account.
- if self.csr_org is None:
- self.csr_org = ''
-
- try:
- self.ecs_client = ECSClient(
- entrust_api_user=module.params.get('entrust_api_user'),
- entrust_api_key=module.params.get('entrust_api_key'),
- entrust_api_cert=module.params.get('entrust_api_client_cert_path'),
- entrust_api_cert_key=module.params.get('entrust_api_client_cert_key_path'),
- entrust_api_specification_path=module.params.get('entrust_api_specification_path')
- )
- except SessionConfigurationException as e:
- module.fail_json(msg='Failed to initialize Entrust Provider: {0}'.format(to_native(e.message)))
-
- def generate(self, module):
-
- if not self.check(module, perms_required=False) or self.force:
- # Read the CSR that was generated for us
- body = {}
- if self.csr_content is not None:
- body['csr'] = self.csr_content
- else:
- with open(self.csr_path, 'r') as csr_file:
- body['csr'] = csr_file.read()
-
- body['certType'] = module.params['entrust_cert_type']
-
- # Handle expiration (30 days if not specified)
- expiry = self.notAfter
- if not expiry:
- gmt_now = datetime.datetime.fromtimestamp(time.mktime(time.gmtime()))
- expiry = gmt_now + datetime.timedelta(days=365)
-
- expiry_iso3339 = expiry.strftime("%Y-%m-%dT%H:%M:%S.00Z")
- body['certExpiryDate'] = expiry_iso3339
- body['org'] = self.csr_org
- body['tracking'] = {
- 'requesterName': module.params['entrust_requester_name'],
- 'requesterEmail': module.params['entrust_requester_email'],
- 'requesterPhone': module.params['entrust_requester_phone'],
- }
-
- try:
- result = self.ecs_client.NewCertRequest(Body=body)
- self.trackingId = result.get('trackingId')
- except RestOperationException as e:
- module.fail_json(msg='Failed to request new certificate from Entrust Certificate Services (ECS): {0}'.format(to_native(e.message)))
-
- if self.backup:
- self.backup_file = module.backup_local(self.path)
- crypto_utils.write_file(module, to_bytes(result.get('endEntityCert')))
- self.cert = crypto_utils.load_certificate(self.path, backend=self.backend)
- self.changed = True
-
- def check(self, module, perms_required=True):
- """Ensure the resource is in its desired state."""
-
- parent_check = super(EntrustCertificate, self).check(module, perms_required)
-
- try:
- cert_details = self._get_cert_details()
- except RestOperationException as e:
- module.fail_json(msg='Failed to get status of existing certificate from Entrust Certificate Services (ECS): {0}.'.format(to_native(e.message)))
-
- # Always issue a new certificate if the certificate is expired, suspended or revoked
- status = cert_details.get('status', False)
- if status == 'EXPIRED' or status == 'SUSPENDED' or status == 'REVOKED':
- return False
-
- # If the requested cert type was specified and it is for a different certificate type than the initial certificate, a new one is needed
- if module.params['entrust_cert_type'] and cert_details.get('certType') and module.params['entrust_cert_type'] != cert_details.get('certType'):
- return False
-
- return parent_check
-
- def _get_cert_details(self):
- cert_details = {}
- if self.cert:
- serial_number = None
- expiry = None
- if self.backend == 'pyopenssl':
- serial_number = "{0:X}".format(self.cert.get_serial_number())
- time_string = to_native(self.cert.get_notAfter())
- expiry = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
- elif self.backend == 'cryptography':
- serial_number = "{0:X}".format(self.cert.serial_number)
- expiry = self.cert.not_valid_after
-
- # get some information about the expiry of this certificate
- expiry_iso3339 = expiry.strftime("%Y-%m-%dT%H:%M:%S.00Z")
- cert_details['expiresAfter'] = expiry_iso3339
-
- # If a trackingId is not already defined (from the result of a generate)
- # use the serial number to identify the tracking Id
- if self.trackingId is None and serial_number is not None:
- cert_results = self.ecs_client.GetCertificates(serialNumber=serial_number).get('certificates', {})
-
- # Finding 0 or more than 1 result is a very unlikely use case, it simply means we cannot perform additional checks
- # on the 'state' as returned by Entrust Certificate Services (ECS). The general certificate validity is
- # still checked as it is in the rest of the module.
- if len(cert_results) == 1:
- self.trackingId = cert_results[0].get('trackingId')
-
- if self.trackingId is not None:
- cert_details.update(self.ecs_client.GetCertificate(trackingId=self.trackingId))
-
- return cert_details
-
- def dump(self, check_mode=False):
-
- result = {
- 'changed': self.changed,
- 'filename': self.path,
- 'privatekey': self.privatekey_path,
- 'csr': self.csr_path,
- }
-
- if self.backup_file:
- result['backup_file'] = self.backup_file
- if self.return_content:
- content = crypto_utils.load_file_if_exists(self.path, ignore_errors=True)
- result['certificate'] = content.decode('utf-8') if content else None
-
- result.update(self._get_cert_details())
-
- return result
-
-
-class AcmeCertificate(Certificate):
- """Retrieve a certificate using the ACME protocol."""
-
- # Since there's no real use of the backend,
- # other than the 'self.check' function, we just pass the backend to the constructor
-
- def __init__(self, module, backend):
- super(AcmeCertificate, self).__init__(module, backend)
- self.accountkey_path = module.params['acme_accountkey_path']
- self.challenge_path = module.params['acme_challenge_path']
- self.use_chain = module.params['acme_chain']
- self.acme_directory = module.params['acme_directory']
-
- def generate(self, module):
-
- if self.csr_content is None and not os.path.exists(self.csr_path):
- raise CertificateError(
- 'The certificate signing request file %s does not exist' % self.csr_path
- )
-
- if not os.path.exists(self.accountkey_path):
- raise CertificateError(
- 'The account key %s does not exist' % self.accountkey_path
- )
-
- if not os.path.exists(self.challenge_path):
- raise CertificateError(
- 'The challenge path %s does not exist' % self.challenge_path
- )
-
- if not self.check(module, perms_required=False) or self.force:
- acme_tiny_path = self.module.get_bin_path('acme-tiny', required=True)
- command = [acme_tiny_path]
- if self.use_chain:
- command.append('--chain')
- command.extend(['--account-key', self.accountkey_path])
- if self.csr_content is not None:
- # We need to temporarily write the CSR to disk
- fd, tmpsrc = tempfile.mkstemp()
- module.add_cleanup_file(tmpsrc) # Ansible will delete the file on exit
- f = os.fdopen(fd, 'wb')
- try:
- f.write(self.csr_content)
- except Exception as err:
- try:
- f.close()
- except Exception as dummy:
- pass
- module.fail_json(
- msg="failed to create temporary CSR file: %s" % to_native(err),
- exception=traceback.format_exc()
- )
- f.close()
- command.extend(['--csr', tmpsrc])
- else:
- command.extend(['--csr', self.csr_path])
- command.extend(['--acme-dir', self.challenge_path])
- command.extend(['--directory-url', self.acme_directory])
-
- try:
- crt = module.run_command(command, check_rc=True)[1]
- if self.backup:
- self.backup_file = module.backup_local(self.path)
- crypto_utils.write_file(module, to_bytes(crt))
- self.changed = True
- except OSError as exc:
- raise CertificateError(exc)
-
- file_args = module.load_file_common_arguments(module.params)
- if module.set_fs_attributes_if_different(file_args, False):
- self.changed = True
-
- def dump(self, check_mode=False):
-
- result = {
- 'changed': self.changed,
- 'filename': self.path,
- 'privatekey': self.privatekey_path,
- 'accountkey': self.accountkey_path,
- 'csr': self.csr_path,
- }
- if self.backup_file:
- result['backup_file'] = self.backup_file
- if self.return_content:
- content = crypto_utils.load_file_if_exists(self.path, ignore_errors=True)
- result['certificate'] = content.decode('utf-8') if content else None
-
- return result
-
-
-def main():
- module = AnsibleModule(
- argument_spec=dict(
- state=dict(type='str', default='present', choices=['present', 'absent']),
- path=dict(type='path', required=True),
- provider=dict(type='str', choices=['acme', 'assertonly', 'entrust', 'ownca', 'selfsigned']),
- force=dict(type='bool', default=False,),
- csr_path=dict(type='path'),
- csr_content=dict(type='str'),
- backup=dict(type='bool', default=False),
- select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
- return_content=dict(type='bool', default=False),
-
- # General properties of a certificate
- privatekey_path=dict(type='path'),
- privatekey_content=dict(type='str'),
- privatekey_passphrase=dict(type='str', no_log=True),
-
- # provider: assertonly
- signature_algorithms=dict(type='list', elements='str', removed_in_version='2.13'),
- subject=dict(type='dict', removed_in_version='2.13'),
- subject_strict=dict(type='bool', default=False, removed_in_version='2.13'),
- issuer=dict(type='dict', removed_in_version='2.13'),
- issuer_strict=dict(type='bool', default=False, removed_in_version='2.13'),
- has_expired=dict(type='bool', default=False, removed_in_version='2.13'),
- version=dict(type='int', removed_in_version='2.13'),
- key_usage=dict(type='list', elements='str', aliases=['keyUsage'], removed_in_version='2.13'),
- key_usage_strict=dict(type='bool', default=False, aliases=['keyUsage_strict'], removed_in_version='2.13'),
- extended_key_usage=dict(type='list', elements='str', aliases=['extendedKeyUsage'], removed_in_version='2.13'),
- extended_key_usage_strict=dict(type='bool', default=False, aliases=['extendedKeyUsage_strict'], removed_in_version='2.13'),
- subject_alt_name=dict(type='list', elements='str', aliases=['subjectAltName'], removed_in_version='2.13'),
- subject_alt_name_strict=dict(type='bool', default=False, aliases=['subjectAltName_strict'], removed_in_version='2.13'),
- not_before=dict(type='str', aliases=['notBefore'], removed_in_version='2.13'),
- not_after=dict(type='str', aliases=['notAfter'], removed_in_version='2.13'),
- valid_at=dict(type='str', removed_in_version='2.13'),
- invalid_at=dict(type='str', removed_in_version='2.13'),
- valid_in=dict(type='str', removed_in_version='2.13'),
-
- # provider: selfsigned
- selfsigned_version=dict(type='int', default=3),
- selfsigned_digest=dict(type='str', default='sha256'),
- selfsigned_not_before=dict(type='str', default='+0s', aliases=['selfsigned_notBefore']),
- selfsigned_not_after=dict(type='str', default='+3650d', aliases=['selfsigned_notAfter']),
- selfsigned_create_subject_key_identifier=dict(
- type='str',
- default='create_if_not_provided',
- choices=['create_if_not_provided', 'always_create', 'never_create']
- ),
-
- # provider: ownca
- ownca_path=dict(type='path'),
- ownca_content=dict(type='str'),
- ownca_privatekey_path=dict(type='path'),
- ownca_privatekey_content=dict(type='str'),
- ownca_privatekey_passphrase=dict(type='str', no_log=True),
- ownca_digest=dict(type='str', default='sha256'),
- ownca_version=dict(type='int', default=3),
- ownca_not_before=dict(type='str', default='+0s'),
- ownca_not_after=dict(type='str', default='+3650d'),
- ownca_create_subject_key_identifier=dict(
- type='str',
- default='create_if_not_provided',
- choices=['create_if_not_provided', 'always_create', 'never_create']
- ),
- ownca_create_authority_key_identifier=dict(type='bool', default=True),
-
- # provider: acme
- acme_accountkey_path=dict(type='path'),
- acme_challenge_path=dict(type='path'),
- acme_chain=dict(type='bool', default=False),
- acme_directory=dict(type='str', default="https://acme-v02.api.letsencrypt.org/directory"),
-
- # provider: entrust
- entrust_cert_type=dict(type='str', default='STANDARD_SSL',
- choices=['STANDARD_SSL', 'ADVANTAGE_SSL', 'UC_SSL', 'EV_SSL', 'WILDCARD_SSL',
- 'PRIVATE_SSL', 'PD_SSL', 'CDS_ENT_LITE', 'CDS_ENT_PRO', 'SMIME_ENT']),
- entrust_requester_email=dict(type='str'),
- entrust_requester_name=dict(type='str'),
- entrust_requester_phone=dict(type='str'),
- entrust_api_user=dict(type='str'),
- entrust_api_key=dict(type='str', no_log=True),
- entrust_api_client_cert_path=dict(type='path'),
- entrust_api_client_cert_key_path=dict(type='path', no_log=True),
- entrust_api_specification_path=dict(type='path', default='https://cloud.entrust.net/EntrustCloud/documentation/cms-api-2.1.0.yaml'),
- entrust_not_after=dict(type='str', default='+365d'),
- ),
- supports_check_mode=True,
- add_file_common_args=True,
- required_if=[
- ['state', 'present', ['provider']],
- ['provider', 'entrust', ['entrust_requester_email', 'entrust_requester_name', 'entrust_requester_phone',
- 'entrust_api_user', 'entrust_api_key', 'entrust_api_client_cert_path',
- 'entrust_api_client_cert_key_path']],
- ],
- mutually_exclusive=[
- ['csr_path', 'csr_content'],
- ['privatekey_path', 'privatekey_content'],
- ['ownca_path', 'ownca_content'],
- ['ownca_privatekey_path', 'ownca_privatekey_content'],
- ],
- )
-
- try:
- if module.params['state'] == 'absent':
- certificate = CertificateAbsent(module)
-
- else:
- if module.params['provider'] != 'assertonly' and module.params['csr_path'] is None and module.params['csr_content'] is None:
- module.fail_json(msg='csr_path or csr_content is required when provider is not assertonly')
-
- base_dir = os.path.dirname(module.params['path']) or '.'
- if not os.path.isdir(base_dir):
- module.fail_json(
- name=base_dir,
- msg='The directory %s does not exist or the file is not a directory' % base_dir
- )
-
- provider = module.params['provider']
- if provider == 'assertonly':
- module.deprecate("The 'assertonly' provider is deprecated; please see the examples of "
- "the 'openssl_certificate' module on how to replace it with other modules",
- version='2.13', collection_name='ansible.builtin')
- elif provider == 'selfsigned':
- if module.params['privatekey_path'] is None and module.params['privatekey_content'] is None:
- module.fail_json(msg='One of privatekey_path and privatekey_content must be specified for the selfsigned provider.')
- elif provider == 'acme':
- if module.params['acme_accountkey_path'] is None:
- module.fail_json(msg='The acme_accountkey_path option must be specified for the acme provider.')
- if module.params['acme_challenge_path'] is None:
- module.fail_json(msg='The acme_challenge_path option must be specified for the acme provider.')
- elif provider == 'ownca':
- if module.params['ownca_path'] is None and module.params['ownca_content'] is None:
- module.fail_json(msg='One of ownca_path and ownca_content must be specified for the ownca provider.')
- if module.params['ownca_privatekey_path'] is None and module.params['ownca_privatekey_content'] is None:
- module.fail_json(msg='One of ownca_privatekey_path and ownca_privatekey_content must be specified for the ownca provider.')
-
- backend = module.params['select_crypto_backend']
- if backend == 'auto':
- # Detect what backend we can use
- can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
- can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
-
- # If cryptography is available we'll use it
- if can_use_cryptography:
- backend = 'cryptography'
- elif can_use_pyopenssl:
- backend = 'pyopenssl'
-
- if module.params['selfsigned_version'] == 2 or module.params['ownca_version'] == 2:
- module.warn('crypto backend forced to pyopenssl. The cryptography library does not support v2 certificates')
- backend = 'pyopenssl'
-
- # Fail if no backend has been found
- if backend == 'auto':
- module.fail_json(msg=("Can't detect any of the required Python libraries "
- "cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
- MINIMAL_CRYPTOGRAPHY_VERSION,
- MINIMAL_PYOPENSSL_VERSION))
-
- if backend == 'pyopenssl':
- if not PYOPENSSL_FOUND:
- module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
- exception=PYOPENSSL_IMP_ERR)
- if module.params['provider'] in ['selfsigned', 'ownca', 'assertonly']:
- try:
- getattr(crypto.X509Req, 'get_extensions')
- except AttributeError:
- module.fail_json(msg='You need to have PyOpenSSL>=0.15')
-
- module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated',
- version='2.13', collection_name='ansible.builtin')
- if provider == 'selfsigned':
- certificate = SelfSignedCertificate(module)
- elif provider == 'acme':
- certificate = AcmeCertificate(module, 'pyopenssl')
- elif provider == 'ownca':
- certificate = OwnCACertificate(module)
- elif provider == 'entrust':
- certificate = EntrustCertificate(module, 'pyopenssl')
- else:
- certificate = AssertOnlyCertificate(module)
- elif backend == 'cryptography':
- if not CRYPTOGRAPHY_FOUND:
- module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
- exception=CRYPTOGRAPHY_IMP_ERR)
- if module.params['selfsigned_version'] == 2 or module.params['ownca_version'] == 2:
- module.fail_json(msg='The cryptography backend does not support v2 certificates, '
- 'use select_crypto_backend=pyopenssl for v2 certificates')
- if provider == 'selfsigned':
- certificate = SelfSignedCertificateCryptography(module)
- elif provider == 'acme':
- certificate = AcmeCertificate(module, 'cryptography')
- elif provider == 'ownca':
- certificate = OwnCACertificateCryptography(module)
- elif provider == 'entrust':
- certificate = EntrustCertificate(module, 'cryptography')
- else:
- certificate = AssertOnlyCertificateCryptography(module)
-
- if module.params['state'] == 'present':
- if module.check_mode:
- result = certificate.dump(check_mode=True)
- result['changed'] = module.params['force'] or not certificate.check(module)
- module.exit_json(**result)
-
- certificate.generate(module)
- else:
- if module.check_mode:
- result = certificate.dump(check_mode=True)
- result['changed'] = os.path.exists(module.params['path'])
- module.exit_json(**result)
-
- certificate.remove(module)
-
- result = certificate.dump()
- module.exit_json(**result)
- except crypto_utils.OpenSSLObjectError as exc:
- module.fail_json(msg=to_native(exc))
-
-
-if __name__ == "__main__":
- main()
diff --git a/test/support/integration/plugins/modules/openssl_certificate_info.py b/test/support/integration/plugins/modules/openssl_certificate_info.py
deleted file mode 100644
index 27e65153ea..0000000000
--- a/test/support/integration/plugins/modules/openssl_certificate_info.py
+++ /dev/null
@@ -1,864 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-
-# Copyright: (c) 2016-2017, Yanis Guenane <yanis+ansible@guenane.org>
-# Copyright: (c) 2017, Markus Teufelberger <mteufelberger+ansible@mgit.at>
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import absolute_import, division, print_function
-__metaclass__ = type
-
-ANSIBLE_METADATA = {'metadata_version': '1.1',
- 'status': ['preview'],
- 'supported_by': 'community'}
-
-DOCUMENTATION = r'''
----
-module: openssl_certificate_info
-version_added: '2.8'
-short_description: Provide information of OpenSSL X.509 certificates
-description:
- - This module allows one to query information on OpenSSL certificates.
- - It uses the pyOpenSSL or cryptography python library to interact with OpenSSL. If both the
- cryptography and PyOpenSSL libraries are available (and meet the minimum version requirements)
- cryptography will be preferred as a backend over PyOpenSSL (unless the backend is forced with
- C(select_crypto_backend)). Please note that the PyOpenSSL backend was deprecated in Ansible 2.9
- and will be removed in Ansible 2.13.
-requirements:
- - PyOpenSSL >= 0.15 or cryptography >= 1.6
-author:
- - Felix Fontein (@felixfontein)
- - Yanis Guenane (@Spredzy)
- - Markus Teufelberger (@MarkusTeufelberger)
-options:
- path:
- description:
- - Remote absolute path where the certificate file is loaded from.
- - Either I(path) or I(content) must be specified, but not both.
- type: path
- content:
- description:
- - Content of the X.509 certificate in PEM format.
- - Either I(path) or I(content) must be specified, but not both.
- type: str
- version_added: "2.10"
- valid_at:
- description:
- - A dict of names mapping to time specifications. Every time specified here
- will be checked whether the certificate is valid at this point. See the
- C(valid_at) return value for informations on the result.
- - Time can be specified either as relative time or as absolute timestamp.
- - Time will always be interpreted as UTC.
- - Valid format is C([+-]timespec | ASN.1 TIME) where timespec can be an integer
- + C([w | d | h | m | s]) (e.g. C(+32w1d2h), and ASN.1 TIME (i.e. pattern C(YYYYMMDDHHMMSSZ)).
- Note that all timestamps will be treated as being in UTC.
- type: dict
- select_crypto_backend:
- description:
- - Determines which crypto backend to use.
- - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl).
- - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library.
- - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library.
- - Please note that the C(pyopenssl) backend has been deprecated in Ansible 2.9, and will be removed in Ansible 2.13.
- From that point on, only the C(cryptography) backend will be available.
- type: str
- default: auto
- choices: [ auto, cryptography, pyopenssl ]
-
-notes:
- - All timestamp values are provided in ASN.1 TIME format, i.e. following the C(YYYYMMDDHHMMSSZ) pattern.
- They are all in UTC.
-seealso:
-- module: openssl_certificate
-'''
-
-EXAMPLES = r'''
-- name: Generate a Self Signed OpenSSL certificate
- openssl_certificate:
- path: /etc/ssl/crt/ansible.com.crt
- privatekey_path: /etc/ssl/private/ansible.com.pem
- csr_path: /etc/ssl/csr/ansible.com.csr
- provider: selfsigned
-
-
-# Get information on the certificate
-
-- name: Get information on generated certificate
- openssl_certificate_info:
- path: /etc/ssl/crt/ansible.com.crt
- register: result
-
-- name: Dump information
- debug:
- var: result
-
-
-# Check whether the certificate is valid or not valid at certain times, fail
-# if this is not the case. The first task (openssl_certificate_info) collects
-# the information, and the second task (assert) validates the result and
-# makes the playbook fail in case something is not as expected.
-
-- name: Test whether that certificate is valid tomorrow and/or in three weeks
- openssl_certificate_info:
- path: /etc/ssl/crt/ansible.com.crt
- valid_at:
- point_1: "+1d"
- point_2: "+3w"
- register: result
-
-- name: Validate that certificate is valid tomorrow, but not in three weeks
- assert:
- that:
- - result.valid_at.point_1 # valid in one day
- - not result.valid_at.point_2 # not valid in three weeks
-'''
-
-RETURN = r'''
-expired:
- description: Whether the certificate is expired (i.e. C(notAfter) is in the past)
- returned: success
- type: bool
-basic_constraints:
- description: Entries in the C(basic_constraints) extension, or C(none) if extension is not present.
- returned: success
- type: list
- elements: str
- sample: "[CA:TRUE, pathlen:1]"
-basic_constraints_critical:
- description: Whether the C(basic_constraints) extension is critical.
- returned: success
- type: bool
-extended_key_usage:
- description: Entries in the C(extended_key_usage) extension, or C(none) if extension is not present.
- returned: success
- type: list
- elements: str
- sample: "[Biometric Info, DVCS, Time Stamping]"
-extended_key_usage_critical:
- description: Whether the C(extended_key_usage) extension is critical.
- returned: success
- type: bool
-extensions_by_oid:
- description: Returns a dictionary for every extension OID
- returned: success
- type: dict
- contains:
- critical:
- description: Whether the extension is critical.
- returned: success
- type: bool
- value:
- description: The Base64 encoded value (in DER format) of the extension
- returned: success
- type: str
- sample: "MAMCAQU="
- sample: '{"1.3.6.1.5.5.7.1.24": { "critical": false, "value": "MAMCAQU="}}'
-key_usage:
- description: Entries in the C(key_usage) extension, or C(none) if extension is not present.
- returned: success
- type: str
- sample: "[Key Agreement, Data Encipherment]"
-key_usage_critical:
- description: Whether the C(key_usage) extension is critical.
- returned: success
- type: bool
-subject_alt_name:
- description: Entries in the C(subject_alt_name) extension, or C(none) if extension is not present.
- returned: success
- type: list
- elements: str
- sample: "[DNS:www.ansible.com, IP:1.2.3.4]"
-subject_alt_name_critical:
- description: Whether the C(subject_alt_name) extension is critical.
- returned: success
- type: bool
-ocsp_must_staple:
- description: C(yes) if the OCSP Must Staple extension is present, C(none) otherwise.
- returned: success
- type: bool
-ocsp_must_staple_critical:
- description: Whether the C(ocsp_must_staple) extension is critical.
- returned: success
- type: bool
-issuer:
- description:
- - The certificate's issuer.
- - Note that for repeated values, only the last one will be returned.
- returned: success
- type: dict
- sample: '{"organizationName": "Ansible", "commonName": "ca.example.com"}'
-issuer_ordered:
- description: The certificate's issuer as an ordered list of tuples.
- returned: success
- type: list
- elements: list
- sample: '[["organizationName", "Ansible"], ["commonName": "ca.example.com"]]'
- version_added: "2.9"
-subject:
- description:
- - The certificate's subject as a dictionary.
- - Note that for repeated values, only the last one will be returned.
- returned: success
- type: dict
- sample: '{"commonName": "www.example.com", "emailAddress": "test@example.com"}'
-subject_ordered:
- description: The certificate's subject as an ordered list of tuples.
- returned: success
- type: list
- elements: list
- sample: '[["commonName", "www.example.com"], ["emailAddress": "test@example.com"]]'
- version_added: "2.9"
-not_after:
- description: C(notAfter) date as ASN.1 TIME
- returned: success
- type: str
- sample: 20190413202428Z
-not_before:
- description: C(notBefore) date as ASN.1 TIME
- returned: success
- type: str
- sample: 20190331202428Z
-public_key:
- description: Certificate's public key in PEM format
- returned: success
- type: str
- sample: "-----BEGIN PUBLIC KEY-----\nMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A..."
-public_key_fingerprints:
- description:
- - Fingerprints of certificate's public key.
- - For every hash algorithm available, the fingerprint is computed.
- returned: success
- type: dict
- sample: "{'sha256': 'd4:b3:aa:6d:c8:04:ce:4e:ba:f6:29:4d:92:a3:94:b0:c2:ff:bd:bf:33:63:11:43:34:0f:51:b0:95:09:2f:63',
- 'sha512': 'f7:07:4a:f0:b0:f0:e6:8b:95:5f:f9:e6:61:0a:32:68:f1..."
-signature_algorithm:
- description: The signature algorithm used to sign the certificate.
- returned: success
- type: str
- sample: sha256WithRSAEncryption
-serial_number:
- description: The certificate's serial number.
- returned: success
- type: int
- sample: 1234
-version:
- description: The certificate version.
- returned: success
- type: int
- sample: 3
-valid_at:
- description: For every time stamp provided in the I(valid_at) option, a
- boolean whether the certificate is valid at that point in time
- or not.
- returned: success
- type: dict
-subject_key_identifier:
- description:
- - The certificate's subject key identifier.
- - The identifier is returned in hexadecimal, with C(:) used to separate bytes.
- - Is C(none) if the C(SubjectKeyIdentifier) extension is not present.
- returned: success and if the pyOpenSSL backend is I(not) used
- type: str
- sample: '00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33'
- version_added: "2.9"
-authority_key_identifier:
- description:
- - The certificate's authority key identifier.
- - The identifier is returned in hexadecimal, with C(:) used to separate bytes.
- - Is C(none) if the C(AuthorityKeyIdentifier) extension is not present.
- returned: success and if the pyOpenSSL backend is I(not) used
- type: str
- sample: '00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33'
- version_added: "2.9"
-authority_cert_issuer:
- description:
- - The certificate's authority cert issuer as a list of general names.
- - Is C(none) if the C(AuthorityKeyIdentifier) extension is not present.
- returned: success and if the pyOpenSSL backend is I(not) used
- type: list
- elements: str
- sample: "[DNS:www.ansible.com, IP:1.2.3.4]"
- version_added: "2.9"
-authority_cert_serial_number:
- description:
- - The certificate's authority cert serial number.
- - Is C(none) if the C(AuthorityKeyIdentifier) extension is not present.
- returned: success and if the pyOpenSSL backend is I(not) used
- type: int
- sample: '12345'
- version_added: "2.9"
-ocsp_uri:
- description: The OCSP responder URI, if included in the certificate. Will be
- C(none) if no OCSP responder URI is included.
- returned: success
- type: str
- version_added: "2.9"
-'''
-
-
-import abc
-import binascii
-import datetime
-import os
-import re
-import traceback
-from distutils.version import LooseVersion
-
-from ansible.module_utils import crypto as crypto_utils
-from ansible.module_utils.basic import AnsibleModule, missing_required_lib
-from ansible.module_utils.six import string_types
-from ansible.module_utils._text import to_native, to_text, to_bytes
-from ansible.module_utils.compat import ipaddress as compat_ipaddress
-
-MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
-MINIMAL_PYOPENSSL_VERSION = '0.15'
-
-PYOPENSSL_IMP_ERR = None
-try:
- import OpenSSL
- from OpenSSL import crypto
- PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
- if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
- # OpenSSL 1.1.0 or newer
- OPENSSL_MUST_STAPLE_NAME = b"tlsfeature"
- OPENSSL_MUST_STAPLE_VALUE = b"status_request"
- else:
- # OpenSSL 1.0.x or older
- OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24"
- OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05"
-except ImportError:
- PYOPENSSL_IMP_ERR = traceback.format_exc()
- PYOPENSSL_FOUND = False
-else:
- PYOPENSSL_FOUND = True
-
-CRYPTOGRAPHY_IMP_ERR = None
-try:
- import cryptography
- from cryptography import x509
- from cryptography.hazmat.primitives import serialization
- CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
-except ImportError:
- CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
- CRYPTOGRAPHY_FOUND = False
-else:
- CRYPTOGRAPHY_FOUND = True
-
-
-TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
-
-
-class CertificateInfo(crypto_utils.OpenSSLObject):
- def __init__(self, module, backend):
- super(CertificateInfo, self).__init__(
- module.params['path'] or '',
- 'present',
- False,
- module.check_mode,
- )
- self.backend = backend
- self.module = module
- self.content = module.params['content']
- if self.content is not None:
- self.content = self.content.encode('utf-8')
-
- self.valid_at = module.params['valid_at']
- if self.valid_at:
- for k, v in self.valid_at.items():
- if not isinstance(v, string_types):
- self.module.fail_json(
- msg='The value for valid_at.{0} must be of type string (got {1})'.format(k, type(v))
- )
- self.valid_at[k] = crypto_utils.get_relative_time_option(v, 'valid_at.{0}'.format(k))
-
- def generate(self):
- # Empty method because crypto_utils.OpenSSLObject wants this
- pass
-
- def dump(self):
- # Empty method because crypto_utils.OpenSSLObject wants this
- pass
-
- @abc.abstractmethod
- def _get_signature_algorithm(self):
- pass
-
- @abc.abstractmethod
- def _get_subject_ordered(self):
- pass
-
- @abc.abstractmethod
- def _get_issuer_ordered(self):
- pass
-
- @abc.abstractmethod
- def _get_version(self):
- pass
-
- @abc.abstractmethod
- def _get_key_usage(self):
- pass
-
- @abc.abstractmethod
- def _get_extended_key_usage(self):
- pass
-
- @abc.abstractmethod
- def _get_basic_constraints(self):
- pass
-
- @abc.abstractmethod
- def _get_ocsp_must_staple(self):
- pass
-
- @abc.abstractmethod
- def _get_subject_alt_name(self):
- pass
-
- @abc.abstractmethod
- def _get_not_before(self):
- pass
-
- @abc.abstractmethod
- def _get_not_after(self):
- pass
-
- @abc.abstractmethod
- def _get_public_key(self, binary):
- pass
-
- @abc.abstractmethod
- def _get_subject_key_identifier(self):
- pass
-
- @abc.abstractmethod
- def _get_authority_key_identifier(self):
- pass
-
- @abc.abstractmethod
- def _get_serial_number(self):
- pass
-
- @abc.abstractmethod
- def _get_all_extensions(self):
- pass
-
- @abc.abstractmethod
- def _get_ocsp_uri(self):
- pass
-
- def get_info(self):
- result = dict()
- self.cert = crypto_utils.load_certificate(self.path, content=self.content, backend=self.backend)
-
- result['signature_algorithm'] = self._get_signature_algorithm()
- subject = self._get_subject_ordered()
- issuer = self._get_issuer_ordered()
- result['subject'] = dict()
- for k, v in subject:
- result['subject'][k] = v
- result['subject_ordered'] = subject
- result['issuer'] = dict()
- for k, v in issuer:
- result['issuer'][k] = v
- result['issuer_ordered'] = issuer
- result['version'] = self._get_version()
- result['key_usage'], result['key_usage_critical'] = self._get_key_usage()
- result['extended_key_usage'], result['extended_key_usage_critical'] = self._get_extended_key_usage()
- result['basic_constraints'], result['basic_constraints_critical'] = self._get_basic_constraints()
- result['ocsp_must_staple'], result['ocsp_must_staple_critical'] = self._get_ocsp_must_staple()
- result['subject_alt_name'], result['subject_alt_name_critical'] = self._get_subject_alt_name()
-
- not_before = self._get_not_before()
- not_after = self._get_not_after()
- result['not_before'] = not_before.strftime(TIMESTAMP_FORMAT)
- result['not_after'] = not_after.strftime(TIMESTAMP_FORMAT)
- result['expired'] = not_after < datetime.datetime.utcnow()
-
- result['valid_at'] = dict()
- if self.valid_at:
- for k, v in self.valid_at.items():
- result['valid_at'][k] = not_before <= v <= not_after
-
- result['public_key'] = self._get_public_key(binary=False)
- pk = self._get_public_key(binary=True)
- result['public_key_fingerprints'] = crypto_utils.get_fingerprint_of_bytes(pk) if pk is not None else dict()
-
- if self.backend != 'pyopenssl':
- ski = self._get_subject_key_identifier()
- if ski is not None:
- ski = to_native(binascii.hexlify(ski))
- ski = ':'.join([ski[i:i + 2] for i in range(0, len(ski), 2)])
- result['subject_key_identifier'] = ski
-
- aki, aci, acsn = self._get_authority_key_identifier()
- if aki is not None:
- aki = to_native(binascii.hexlify(aki))
- aki = ':'.join([aki[i:i + 2] for i in range(0, len(aki), 2)])
- result['authority_key_identifier'] = aki
- result['authority_cert_issuer'] = aci
- result['authority_cert_serial_number'] = acsn
-
- result['serial_number'] = self._get_serial_number()
- result['extensions_by_oid'] = self._get_all_extensions()
- result['ocsp_uri'] = self._get_ocsp_uri()
-
- return result
-
-
-class CertificateInfoCryptography(CertificateInfo):
- """Validate the supplied cert, using the cryptography backend"""
- def __init__(self, module):
- super(CertificateInfoCryptography, self).__init__(module, 'cryptography')
-
- def _get_signature_algorithm(self):
- return crypto_utils.cryptography_oid_to_name(self.cert.signature_algorithm_oid)
-
- def _get_subject_ordered(self):
- result = []
- for attribute in self.cert.subject:
- result.append([crypto_utils.cryptography_oid_to_name(attribute.oid), attribute.value])
- return result
-
- def _get_issuer_ordered(self):
- result = []
- for attribute in self.cert.issuer:
- result.append([crypto_utils.cryptography_oid_to_name(attribute.oid), attribute.value])
- return result
-
- def _get_version(self):
- if self.cert.version == x509.Version.v1:
- return 1
- if self.cert.version == x509.Version.v3:
- return 3
- return "unknown"
-
- def _get_key_usage(self):
- try:
- current_key_ext = self.cert.extensions.get_extension_for_class(x509.KeyUsage)
- current_key_usage = current_key_ext.value
- key_usage = dict(
- digital_signature=current_key_usage.digital_signature,
- content_commitment=current_key_usage.content_commitment,
- key_encipherment=current_key_usage.key_encipherment,
- data_encipherment=current_key_usage.data_encipherment,
- key_agreement=current_key_usage.key_agreement,
- key_cert_sign=current_key_usage.key_cert_sign,
- crl_sign=current_key_usage.crl_sign,
- encipher_only=False,
- decipher_only=False,
- )
- if key_usage['key_agreement']:
- key_usage.update(dict(
- encipher_only=current_key_usage.encipher_only,
- decipher_only=current_key_usage.decipher_only
- ))
-
- key_usage_names = dict(
- digital_signature='Digital Signature',
- content_commitment='Non Repudiation',
- key_encipherment='Key Encipherment',
- data_encipherment='Data Encipherment',
- key_agreement='Key Agreement',
- key_cert_sign='Certificate Sign',
- crl_sign='CRL Sign',
- encipher_only='Encipher Only',
- decipher_only='Decipher Only',
- )
- return sorted([
- key_usage_names[name] for name, value in key_usage.items() if value
- ]), current_key_ext.critical
- except cryptography.x509.ExtensionNotFound:
- return None, False
-
- def _get_extended_key_usage(self):
- try:
- ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
- return sorted([
- crypto_utils.cryptography_oid_to_name(eku) for eku in ext_keyusage_ext.value
- ]), ext_keyusage_ext.critical
- except cryptography.x509.ExtensionNotFound:
- return None, False
-
- def _get_basic_constraints(self):
- try:
- ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.BasicConstraints)
- result = []
- result.append('CA:{0}'.format('TRUE' if ext_keyusage_ext.value.ca else 'FALSE'))
- if ext_keyusage_ext.value.path_length is not None:
- result.append('pathlen:{0}'.format(ext_keyusage_ext.value.path_length))
- return sorted(result), ext_keyusage_ext.critical
- except cryptography.x509.ExtensionNotFound:
- return None, False
-
- def _get_ocsp_must_staple(self):
- try:
- try:
- # This only works with cryptography >= 2.1
- tlsfeature_ext = self.cert.extensions.get_extension_for_class(x509.TLSFeature)
- value = cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value
- except AttributeError as dummy:
- # Fallback for cryptography < 2.1
- oid = x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24")
- tlsfeature_ext = self.cert.extensions.get_extension_for_oid(oid)
- value = tlsfeature_ext.value.value == b"\x30\x03\x02\x01\x05"
- return value, tlsfeature_ext.critical
- except cryptography.x509.ExtensionNotFound:
- return None, False
-
- def _get_subject_alt_name(self):
- try:
- san_ext = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
- result = [crypto_utils.cryptography_decode_name(san) for san in san_ext.value]
- return result, san_ext.critical
- except cryptography.x509.ExtensionNotFound:
- return None, False
-
- def _get_not_before(self):
- return self.cert.not_valid_before
-
- def _get_not_after(self):
- return self.cert.not_valid_after
-
- def _get_public_key(self, binary):
- return self.cert.public_key().public_bytes(
- serialization.Encoding.DER if binary else serialization.Encoding.PEM,
- serialization.PublicFormat.SubjectPublicKeyInfo
- )
-
- def _get_subject_key_identifier(self):
- try:
- ext = self.cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier)
- return ext.value.digest
- except cryptography.x509.ExtensionNotFound:
- return None
-
- def _get_authority_key_identifier(self):
- try:
- ext = self.cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
- issuer = None
- if ext.value.authority_cert_issuer is not None:
- issuer = [crypto_utils.cryptography_decode_name(san) for san in ext.value.authority_cert_issuer]
- return ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number
- except cryptography.x509.ExtensionNotFound:
- return None, None, None
-
- def _get_serial_number(self):
- return self.cert.serial_number
-
- def _get_all_extensions(self):
- return crypto_utils.cryptography_get_extensions_from_cert(self.cert)
-
- def _get_ocsp_uri(self):
- try:
- ext = self.cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess)
- for desc in ext.value:
- if desc.access_method == x509.oid.AuthorityInformationAccessOID.OCSP:
- if isinstance(desc.access_location, x509.UniformResourceIdentifier):
- return desc.access_location.value
- except x509.ExtensionNotFound as dummy:
- pass
- return None
-
-
-class CertificateInfoPyOpenSSL(CertificateInfo):
- """validate the supplied certificate."""
-
- def __init__(self, module):
- super(CertificateInfoPyOpenSSL, self).__init__(module, 'pyopenssl')
-
- def _get_signature_algorithm(self):
- return to_text(self.cert.get_signature_algorithm())
-
- def __get_name(self, name):
- result = []
- for sub in name.get_components():
- result.append([crypto_utils.pyopenssl_normalize_name(sub[0]), to_text(sub[1])])
- return result
-
- def _get_subject_ordered(self):
- return self.__get_name(self.cert.get_subject())
-
- def _get_issuer_ordered(self):
- return self.__get_name(self.cert.get_issuer())
-
- def _get_version(self):
- # Version numbers in certs are off by one:
- # v1: 0, v2: 1, v3: 2 ...
- return self.cert.get_version() + 1
-
- def _get_extension(self, short_name):
- for extension_idx in range(0, self.cert.get_extension_count()):
- extension = self.cert.get_extension(extension_idx)
- if extension.get_short_name() == short_name:
- result = [
- crypto_utils.pyopenssl_normalize_name(usage.strip()) for usage in to_text(extension, errors='surrogate_or_strict').split(',')
- ]
- return sorted(result), bool(extension.get_critical())
- return None, False
-
- def _get_key_usage(self):
- return self._get_extension(b'keyUsage')
-
- def _get_extended_key_usage(self):
- return self._get_extension(b'extendedKeyUsage')
-
- def _get_basic_constraints(self):
- return self._get_extension(b'basicConstraints')
-
- def _get_ocsp_must_staple(self):
- extensions = [self.cert.get_extension(i) for i in range(0, self.cert.get_extension_count())]
- oms_ext = [
- ext for ext in extensions
- if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE
- ]
- if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000:
- # Older versions of libssl don't know about OCSP Must Staple
- oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05'])
- if oms_ext:
- return True, bool(oms_ext[0].get_critical())
- else:
- return None, False
-
- def _normalize_san(self, san):
- if san.startswith('IP Address:'):
- san = 'IP:' + san[len('IP Address:'):]
- if san.startswith('IP:'):
- ip = compat_ipaddress.ip_address(san[3:])
- san = 'IP:{0}'.format(ip.compressed)
- return san
-
- def _get_subject_alt_name(self):
- for extension_idx in range(0, self.cert.get_extension_count()):
- extension = self.cert.get_extension(extension_idx)
- if extension.get_short_name() == b'subjectAltName':
- result = [self._normalize_san(altname.strip()) for altname in
- to_text(extension, errors='surrogate_or_strict').split(', ')]
- return result, bool(extension.get_critical())
- return None, False
-
- def _get_not_before(self):
- time_string = to_native(self.cert.get_notBefore())
- return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
-
- def _get_not_after(self):
- time_string = to_native(self.cert.get_notAfter())
- return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
-
- def _get_public_key(self, binary):
- try:
- return crypto.dump_publickey(
- crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM,
- self.cert.get_pubkey()
- )
- except AttributeError:
- try:
- # pyOpenSSL < 16.0:
- bio = crypto._new_mem_buf()
- if binary:
- rc = crypto._lib.i2d_PUBKEY_bio(bio, self.cert.get_pubkey()._pkey)
- else:
- rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.cert.get_pubkey()._pkey)
- if rc != 1:
- crypto._raise_current_error()
- return crypto._bio_to_string(bio)
- except AttributeError:
- self.module.warn('Your pyOpenSSL version does not support dumping public keys. '
- 'Please upgrade to version 16.0 or newer, or use the cryptography backend.')
-
- def _get_subject_key_identifier(self):
- # Won't be implemented
- return None
-
- def _get_authority_key_identifier(self):
- # Won't be implemented
- return None, None, None
-
- def _get_serial_number(self):
- return self.cert.get_serial_number()
-
- def _get_all_extensions(self):
- return crypto_utils.pyopenssl_get_extensions_from_cert(self.cert)
-
- def _get_ocsp_uri(self):
- for i in range(self.cert.get_extension_count()):
- ext = self.cert.get_extension(i)
- if ext.get_short_name() == b'authorityInfoAccess':
- v = str(ext)
- m = re.search('^OCSP - URI:(.*)$', v, flags=re.MULTILINE)
- if m:
- return m.group(1)
- return None
-
-
-def main():
- module = AnsibleModule(
- argument_spec=dict(
- path=dict(type='path'),
- content=dict(type='str'),
- valid_at=dict(type='dict'),
- select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
- ),
- required_one_of=(
- ['path', 'content'],
- ),
- mutually_exclusive=(
- ['path', 'content'],
- ),
- supports_check_mode=True,
- )
-
- try:
- if module.params['path'] is not None:
- base_dir = os.path.dirname(module.params['path']) or '.'
- if not os.path.isdir(base_dir):
- module.fail_json(
- name=base_dir,
- msg='The directory %s does not exist or the file is not a directory' % base_dir
- )
-
- backend = module.params['select_crypto_backend']
- if backend == 'auto':
- # Detect what backend we can use
- can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
- can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
-
- # If cryptography is available we'll use it
- if can_use_cryptography:
- backend = 'cryptography'
- elif can_use_pyopenssl:
- backend = 'pyopenssl'
-
- # Fail if no backend has been found
- if backend == 'auto':
- module.fail_json(msg=("Can't detect any of the required Python libraries "
- "cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
- MINIMAL_CRYPTOGRAPHY_VERSION,
- MINIMAL_PYOPENSSL_VERSION))
-
- if backend == 'pyopenssl':
- if not PYOPENSSL_FOUND:
- module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
- exception=PYOPENSSL_IMP_ERR)
- try:
- getattr(crypto.X509Req, 'get_extensions')
- except AttributeError:
- module.fail_json(msg='You need to have PyOpenSSL>=0.15')
-
- module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated',
- version='2.13', collection_name='ansible.builtin')
- certificate = CertificateInfoPyOpenSSL(module)
- elif backend == 'cryptography':
- if not CRYPTOGRAPHY_FOUND:
- module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
- exception=CRYPTOGRAPHY_IMP_ERR)
- certificate = CertificateInfoCryptography(module)
-
- result = certificate.get_info()
- module.exit_json(**result)
- except crypto_utils.OpenSSLObjectError as exc:
- module.fail_json(msg=to_native(exc))
-
-
-if __name__ == "__main__":
- main()
diff --git a/test/support/integration/plugins/modules/openssl_csr.py b/test/support/integration/plugins/modules/openssl_csr.py
deleted file mode 100644
index 2d831f35bf..0000000000
--- a/test/support/integration/plugins/modules/openssl_csr.py
+++ /dev/null
@@ -1,1161 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-
-# Copyrigt: (c) 2017, Yanis Guenane <yanis+ansible@guenane.org>
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import absolute_import, division, print_function
-__metaclass__ = type
-
-ANSIBLE_METADATA = {'metadata_version': '1.1',
- 'status': ['preview'],
- 'supported_by': 'community'}
-
-DOCUMENTATION = r'''
----
-module: openssl_csr
-version_added: '2.4'
-short_description: Generate OpenSSL Certificate Signing Request (CSR)
-description:
- - This module allows one to (re)generate OpenSSL certificate signing requests.
- - It uses the pyOpenSSL python library to interact with openssl. This module supports
- the subjectAltName, keyUsage, extendedKeyUsage, basicConstraints and OCSP Must Staple
- extensions.
- - "Please note that the module regenerates existing CSR if it doesn't match the module's
- options, or if it seems to be corrupt. If you are concerned that this could overwrite
- your existing CSR, consider using the I(backup) option."
- - The module can use the cryptography Python library, or the pyOpenSSL Python
- library. By default, it tries to detect which one is available. This can be
- overridden with the I(select_crypto_backend) option. Please note that the
- PyOpenSSL backend was deprecated in Ansible 2.9 and will be removed in Ansible 2.13."
-requirements:
- - Either cryptography >= 1.3
- - Or pyOpenSSL >= 0.15
-author:
-- Yanis Guenane (@Spredzy)
-options:
- state:
- description:
- - Whether the certificate signing request should exist or not, taking action if the state is different from what is stated.
- type: str
- default: present
- choices: [ absent, present ]
- digest:
- description:
- - The digest used when signing the certificate signing request with the private key.
- type: str
- default: sha256
- privatekey_path:
- description:
- - The path to the private key to use when signing the certificate signing request.
- - Either I(privatekey_path) or I(privatekey_content) must be specified if I(state) is C(present), but not both.
- type: path
- privatekey_content:
- description:
- - The content of the private key to use when signing the certificate signing request.
- - Either I(privatekey_path) or I(privatekey_content) must be specified if I(state) is C(present), but not both.
- type: str
- version_added: "2.10"
- privatekey_passphrase:
- description:
- - The passphrase for the private key.
- - This is required if the private key is password protected.
- type: str
- version:
- description:
- - The version of the certificate signing request.
- - "The only allowed value according to L(RFC 2986,https://tools.ietf.org/html/rfc2986#section-4.1)
- is 1."
- - This option will no longer accept unsupported values from Ansible 2.14 on.
- type: int
- default: 1
- force:
- description:
- - Should the certificate signing request be forced regenerated by this ansible module.
- type: bool
- default: no
- path:
- description:
- - The name of the file into which the generated OpenSSL certificate signing request will be written.
- type: path
- required: true
- subject:
- description:
- - Key/value pairs that will be present in the subject name field of the certificate signing request.
- - If you need to specify more than one value with the same key, use a list as value.
- type: dict
- version_added: '2.5'
- country_name:
- description:
- - The countryName field of the certificate signing request subject.
- type: str
- aliases: [ C, countryName ]
- state_or_province_name:
- description:
- - The stateOrProvinceName field of the certificate signing request subject.
- type: str
- aliases: [ ST, stateOrProvinceName ]
- locality_name:
- description:
- - The localityName field of the certificate signing request subject.
- type: str
- aliases: [ L, localityName ]
- organization_name:
- description:
- - The organizationName field of the certificate signing request subject.
- type: str
- aliases: [ O, organizationName ]
- organizational_unit_name:
- description:
- - The organizationalUnitName field of the certificate signing request subject.
- type: str
- aliases: [ OU, organizationalUnitName ]
- common_name:
- description:
- - The commonName field of the certificate signing request subject.
- type: str
- aliases: [ CN, commonName ]
- email_address:
- description:
- - The emailAddress field of the certificate signing request subject.
- type: str
- aliases: [ E, emailAddress ]
- subject_alt_name:
- description:
- - SAN extension to attach to the certificate signing request.
- - This can either be a 'comma separated string' or a YAML list.
- - Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName),
- C(otherName) and the ones specific to your CA)
- - Note that if no SAN is specified, but a common name, the common
- name will be added as a SAN except if C(useCommonNameForSAN) is
- set to I(false).
- - More at U(https://tools.ietf.org/html/rfc5280#section-4.2.1.6).
- type: list
- elements: str
- aliases: [ subjectAltName ]
- subject_alt_name_critical:
- description:
- - Should the subjectAltName extension be considered as critical.
- type: bool
- aliases: [ subjectAltName_critical ]
- use_common_name_for_san:
- description:
- - If set to C(yes), the module will fill the common name in for
- C(subject_alt_name) with C(DNS:) prefix if no SAN is specified.
- type: bool
- default: yes
- version_added: '2.8'
- aliases: [ useCommonNameForSAN ]
- key_usage:
- description:
- - This defines the purpose (e.g. encipherment, signature, certificate signing)
- of the key contained in the certificate.
- type: list
- elements: str
- aliases: [ keyUsage ]
- key_usage_critical:
- description:
- - Should the keyUsage extension be considered as critical.
- type: bool
- aliases: [ keyUsage_critical ]
- extended_key_usage:
- description:
- - Additional restrictions (e.g. client authentication, server authentication)
- on the allowed purposes for which the public key may be used.
- type: list
- elements: str
- aliases: [ extKeyUsage, extendedKeyUsage ]
- extended_key_usage_critical:
- description:
- - Should the extkeyUsage extension be considered as critical.
- type: bool
- aliases: [ extKeyUsage_critical, extendedKeyUsage_critical ]
- basic_constraints:
- description:
- - Indicates basic constraints, such as if the certificate is a CA.
- type: list
- elements: str
- version_added: '2.5'
- aliases: [ basicConstraints ]
- basic_constraints_critical:
- description:
- - Should the basicConstraints extension be considered as critical.
- type: bool
- version_added: '2.5'
- aliases: [ basicConstraints_critical ]
- ocsp_must_staple:
- description:
- - Indicates that the certificate should contain the OCSP Must Staple
- extension (U(https://tools.ietf.org/html/rfc7633)).
- type: bool
- version_added: '2.5'
- aliases: [ ocspMustStaple ]
- ocsp_must_staple_critical:
- description:
- - Should the OCSP Must Staple extension be considered as critical
- - Note that according to the RFC, this extension should not be marked
- as critical, as old clients not knowing about OCSP Must Staple
- are required to reject such certificates
- (see U(https://tools.ietf.org/html/rfc7633#section-4)).
- type: bool
- version_added: '2.5'
- aliases: [ ocspMustStaple_critical ]
- select_crypto_backend:
- description:
- - Determines which crypto backend to use.
- - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl).
- - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library.
- - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library.
- - Please note that the C(pyopenssl) backend has been deprecated in Ansible 2.9, and will be removed in Ansible 2.13.
- From that point on, only the C(cryptography) backend will be available.
- type: str
- default: auto
- choices: [ auto, cryptography, pyopenssl ]
- version_added: '2.8'
- backup:
- description:
- - Create a backup file including a timestamp so you can get the original
- CSR back if you overwrote it with a new one by accident.
- type: bool
- default: no
- version_added: "2.8"
- create_subject_key_identifier:
- description:
- - Create the Subject Key Identifier from the public key.
- - "Please note that commercial CAs can ignore the value, respectively use a value of
- their own choice instead. Specifying this option is mostly useful for self-signed
- certificates or for own CAs."
- - Note that this is only supported if the C(cryptography) backend is used!
- type: bool
- default: no
- version_added: "2.9"
- subject_key_identifier:
- description:
- - The subject key identifier as a hex string, where two bytes are separated by colons.
- - "Example: C(00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33)"
- - "Please note that commercial CAs ignore this value, respectively use a value of their
- own choice. Specifying this option is mostly useful for self-signed certificates
- or for own CAs."
- - Note that this option can only be used if I(create_subject_key_identifier) is C(no).
- - Note that this is only supported if the C(cryptography) backend is used!
- type: str
- version_added: "2.9"
- authority_key_identifier:
- description:
- - The authority key identifier as a hex string, where two bytes are separated by colons.
- - "Example: C(00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33)"
- - If specified, I(authority_cert_issuer) must also be specified.
- - "Please note that commercial CAs ignore this value, respectively use a value of their
- own choice. Specifying this option is mostly useful for self-signed certificates
- or for own CAs."
- - Note that this is only supported if the C(cryptography) backend is used!
- - The C(AuthorityKeyIdentifier) will only be added if at least one of I(authority_key_identifier),
- I(authority_cert_issuer) and I(authority_cert_serial_number) is specified.
- type: str
- version_added: "2.9"
- authority_cert_issuer:
- description:
- - Names that will be present in the authority cert issuer field of the certificate signing request.
- - Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName),
- C(otherName) and the ones specific to your CA)
- - "Example: C(DNS:ca.example.org)"
- - If specified, I(authority_key_identifier) must also be specified.
- - "Please note that commercial CAs ignore this value, respectively use a value of their
- own choice. Specifying this option is mostly useful for self-signed certificates
- or for own CAs."
- - Note that this is only supported if the C(cryptography) backend is used!
- - The C(AuthorityKeyIdentifier) will only be added if at least one of I(authority_key_identifier),
- I(authority_cert_issuer) and I(authority_cert_serial_number) is specified.
- type: list
- elements: str
- version_added: "2.9"
- authority_cert_serial_number:
- description:
- - The authority cert serial number.
- - Note that this is only supported if the C(cryptography) backend is used!
- - "Please note that commercial CAs ignore this value, respectively use a value of their
- own choice. Specifying this option is mostly useful for self-signed certificates
- or for own CAs."
- - The C(AuthorityKeyIdentifier) will only be added if at least one of I(authority_key_identifier),
- I(authority_cert_issuer) and I(authority_cert_serial_number) is specified.
- type: int
- version_added: "2.9"
- return_content:
- description:
- - If set to C(yes), will return the (current or generated) CSR's content as I(csr).
- type: bool
- default: no
- version_added: "2.10"
-extends_documentation_fragment:
-- files
-notes:
- - If the certificate signing request already exists it will be checked whether subjectAltName,
- keyUsage, extendedKeyUsage and basicConstraints only contain the requested values, whether
- OCSP Must Staple is as requested, and if the request was signed by the given private key.
-seealso:
-- module: openssl_certificate
-- module: openssl_dhparam
-- module: openssl_pkcs12
-- module: openssl_privatekey
-- module: openssl_publickey
-'''
-
-EXAMPLES = r'''
-- name: Generate an OpenSSL Certificate Signing Request
- openssl_csr:
- path: /etc/ssl/csr/www.ansible.com.csr
- privatekey_path: /etc/ssl/private/ansible.com.pem
- common_name: www.ansible.com
-
-- name: Generate an OpenSSL Certificate Signing Request with an inline key
- openssl_csr:
- path: /etc/ssl/csr/www.ansible.com.csr
- privatekey_content: "{{ private_key_content }}"
- common_name: www.ansible.com
-
-- name: Generate an OpenSSL Certificate Signing Request with a passphrase protected private key
- openssl_csr:
- path: /etc/ssl/csr/www.ansible.com.csr
- privatekey_path: /etc/ssl/private/ansible.com.pem
- privatekey_passphrase: ansible
- common_name: www.ansible.com
-
-- name: Generate an OpenSSL Certificate Signing Request with Subject information
- openssl_csr:
- path: /etc/ssl/csr/www.ansible.com.csr
- privatekey_path: /etc/ssl/private/ansible.com.pem
- country_name: FR
- organization_name: Ansible
- email_address: jdoe@ansible.com
- common_name: www.ansible.com
-
-- name: Generate an OpenSSL Certificate Signing Request with subjectAltName extension
- openssl_csr:
- path: /etc/ssl/csr/www.ansible.com.csr
- privatekey_path: /etc/ssl/private/ansible.com.pem
- subject_alt_name: 'DNS:www.ansible.com,DNS:m.ansible.com'
-
-- name: Generate an OpenSSL CSR with subjectAltName extension with dynamic list
- openssl_csr:
- path: /etc/ssl/csr/www.ansible.com.csr
- privatekey_path: /etc/ssl/private/ansible.com.pem
- subject_alt_name: "{{ item.value | map('regex_replace', '^', 'DNS:') | list }}"
- with_dict:
- dns_server:
- - www.ansible.com
- - m.ansible.com
-
-- name: Force regenerate an OpenSSL Certificate Signing Request
- openssl_csr:
- path: /etc/ssl/csr/www.ansible.com.csr
- privatekey_path: /etc/ssl/private/ansible.com.pem
- force: yes
- common_name: www.ansible.com
-
-- name: Generate an OpenSSL Certificate Signing Request with special key usages
- openssl_csr:
- path: /etc/ssl/csr/www.ansible.com.csr
- privatekey_path: /etc/ssl/private/ansible.com.pem
- common_name: www.ansible.com
- key_usage:
- - digitalSignature
- - keyAgreement
- extended_key_usage:
- - clientAuth
-
-- name: Generate an OpenSSL Certificate Signing Request with OCSP Must Staple
- openssl_csr:
- path: /etc/ssl/csr/www.ansible.com.csr
- privatekey_path: /etc/ssl/private/ansible.com.pem
- common_name: www.ansible.com
- ocsp_must_staple: yes
-'''
-
-RETURN = r'''
-privatekey:
- description:
- - Path to the TLS/SSL private key the CSR was generated for
- - Will be C(none) if the private key has been provided in I(privatekey_content).
- returned: changed or success
- type: str
- sample: /etc/ssl/private/ansible.com.pem
-filename:
- description: Path to the generated Certificate Signing Request
- returned: changed or success
- type: str
- sample: /etc/ssl/csr/www.ansible.com.csr
-subject:
- description: A list of the subject tuples attached to the CSR
- returned: changed or success
- type: list
- elements: list
- sample: "[('CN', 'www.ansible.com'), ('O', 'Ansible')]"
-subjectAltName:
- description: The alternative names this CSR is valid for
- returned: changed or success
- type: list
- elements: str
- sample: [ 'DNS:www.ansible.com', 'DNS:m.ansible.com' ]
-keyUsage:
- description: Purpose for which the public key may be used
- returned: changed or success
- type: list
- elements: str
- sample: [ 'digitalSignature', 'keyAgreement' ]
-extendedKeyUsage:
- description: Additional restriction on the public key purposes
- returned: changed or success
- type: list
- elements: str
- sample: [ 'clientAuth' ]
-basicConstraints:
- description: Indicates if the certificate belongs to a CA
- returned: changed or success
- type: list
- elements: str
- sample: ['CA:TRUE', 'pathLenConstraint:0']
-ocsp_must_staple:
- description: Indicates whether the certificate has the OCSP
- Must Staple feature enabled
- returned: changed or success
- type: bool
- sample: false
-backup_file:
- description: Name of backup file created.
- returned: changed and if I(backup) is C(yes)
- type: str
- sample: /path/to/www.ansible.com.csr.2019-03-09@11:22~
-csr:
- description: The (current or generated) CSR's content.
- returned: if I(state) is C(present) and I(return_content) is C(yes)
- type: str
- version_added: "2.10"
-'''
-
-import abc
-import binascii
-import os
-import traceback
-from distutils.version import LooseVersion
-
-from ansible.module_utils import crypto as crypto_utils
-from ansible.module_utils.basic import AnsibleModule, missing_required_lib
-from ansible.module_utils._text import to_native, to_bytes, to_text
-from ansible.module_utils.compat import ipaddress as compat_ipaddress
-
-MINIMAL_PYOPENSSL_VERSION = '0.15'
-MINIMAL_CRYPTOGRAPHY_VERSION = '1.3'
-
-PYOPENSSL_IMP_ERR = None
-try:
- import OpenSSL
- from OpenSSL import crypto
- PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
-except ImportError:
- PYOPENSSL_IMP_ERR = traceback.format_exc()
- PYOPENSSL_FOUND = False
-else:
- PYOPENSSL_FOUND = True
- if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
- # OpenSSL 1.1.0 or newer
- OPENSSL_MUST_STAPLE_NAME = b"tlsfeature"
- OPENSSL_MUST_STAPLE_VALUE = b"status_request"
- else:
- # OpenSSL 1.0.x or older
- OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24"
- OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05"
-
-CRYPTOGRAPHY_IMP_ERR = None
-try:
- import cryptography
- import cryptography.x509
- import cryptography.x509.oid
- import cryptography.exceptions
- import cryptography.hazmat.backends
- import cryptography.hazmat.primitives.serialization
- import cryptography.hazmat.primitives.hashes
- CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
-except ImportError:
- CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
- CRYPTOGRAPHY_FOUND = False
-else:
- CRYPTOGRAPHY_FOUND = True
- CRYPTOGRAPHY_MUST_STAPLE_NAME = cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24")
- CRYPTOGRAPHY_MUST_STAPLE_VALUE = b"\x30\x03\x02\x01\x05"
-
-
-class CertificateSigningRequestError(crypto_utils.OpenSSLObjectError):
- pass
-
-
-class CertificateSigningRequestBase(crypto_utils.OpenSSLObject):
-
- def __init__(self, module):
- super(CertificateSigningRequestBase, self).__init__(
- module.params['path'],
- module.params['state'],
- module.params['force'],
- module.check_mode
- )
- self.digest = module.params['digest']
- self.privatekey_path = module.params['privatekey_path']
- self.privatekey_content = module.params['privatekey_content']
- if self.privatekey_content is not None:
- self.privatekey_content = self.privatekey_content.encode('utf-8')
- self.privatekey_passphrase = module.params['privatekey_passphrase']
- self.version = module.params['version']
- self.subjectAltName = module.params['subject_alt_name']
- self.subjectAltName_critical = module.params['subject_alt_name_critical']
- self.keyUsage = module.params['key_usage']
- self.keyUsage_critical = module.params['key_usage_critical']
- self.extendedKeyUsage = module.params['extended_key_usage']
- self.extendedKeyUsage_critical = module.params['extended_key_usage_critical']
- self.basicConstraints = module.params['basic_constraints']
- self.basicConstraints_critical = module.params['basic_constraints_critical']
- self.ocspMustStaple = module.params['ocsp_must_staple']
- self.ocspMustStaple_critical = module.params['ocsp_must_staple_critical']
- self.create_subject_key_identifier = module.params['create_subject_key_identifier']
- self.subject_key_identifier = module.params['subject_key_identifier']
- self.authority_key_identifier = module.params['authority_key_identifier']
- self.authority_cert_issuer = module.params['authority_cert_issuer']
- self.authority_cert_serial_number = module.params['authority_cert_serial_number']
- self.request = None
- self.privatekey = None
- self.csr_bytes = None
- self.return_content = module.params['return_content']
-
- if self.create_subject_key_identifier and self.subject_key_identifier is not None:
- module.fail_json(msg='subject_key_identifier cannot be specified if create_subject_key_identifier is true')
-
- self.backup = module.params['backup']
- self.backup_file = None
-
- self.subject = [
- ('C', module.params['country_name']),
- ('ST', module.params['state_or_province_name']),
- ('L', module.params['locality_name']),
- ('O', module.params['organization_name']),
- ('OU', module.params['organizational_unit_name']),
- ('CN', module.params['common_name']),
- ('emailAddress', module.params['email_address']),
- ]
-
- if module.params['subject']:
- self.subject = self.subject + crypto_utils.parse_name_field(module.params['subject'])
- self.subject = [(entry[0], entry[1]) for entry in self.subject if entry[1]]
-
- if not self.subjectAltName and module.params['use_common_name_for_san']:
- for sub in self.subject:
- if sub[0] in ('commonName', 'CN'):
- self.subjectAltName = ['DNS:%s' % sub[1]]
- break
-
- if self.subject_key_identifier is not None:
- try:
- self.subject_key_identifier = binascii.unhexlify(self.subject_key_identifier.replace(':', ''))
- except Exception as e:
- raise CertificateSigningRequestError('Cannot parse subject_key_identifier: {0}'.format(e))
-
- if self.authority_key_identifier is not None:
- try:
- self.authority_key_identifier = binascii.unhexlify(self.authority_key_identifier.replace(':', ''))
- except Exception as e:
- raise CertificateSigningRequestError('Cannot parse authority_key_identifier: {0}'.format(e))
-
- @abc.abstractmethod
- def _generate_csr(self):
- pass
-
- def generate(self, module):
- '''Generate the certificate signing request.'''
- if not self.check(module, perms_required=False) or self.force:
- result = self._generate_csr()
- if self.backup:
- self.backup_file = module.backup_local(self.path)
- if self.return_content:
- self.csr_bytes = result
- crypto_utils.write_file(module, result)
- self.changed = True
-
- file_args = module.load_file_common_arguments(module.params)
- if module.set_fs_attributes_if_different(file_args, False):
- self.changed = True
-
- @abc.abstractmethod
- def _load_private_key(self):
- pass
-
- @abc.abstractmethod
- def _check_csr(self):
- pass
-
- def check(self, module, perms_required=True):
- """Ensure the resource is in its desired state."""
- state_and_perms = super(CertificateSigningRequestBase, self).check(module, perms_required)
-
- self._load_private_key()
-
- if not state_and_perms:
- return False
-
- return self._check_csr()
-
- def remove(self, module):
- if self.backup:
- self.backup_file = module.backup_local(self.path)
- super(CertificateSigningRequestBase, self).remove(module)
-
- def dump(self):
- '''Serialize the object into a dictionary.'''
-
- result = {
- 'privatekey': self.privatekey_path,
- 'filename': self.path,
- 'subject': self.subject,
- 'subjectAltName': self.subjectAltName,
- 'keyUsage': self.keyUsage,
- 'extendedKeyUsage': self.extendedKeyUsage,
- 'basicConstraints': self.basicConstraints,
- 'ocspMustStaple': self.ocspMustStaple,
- 'changed': self.changed
- }
- if self.backup_file:
- result['backup_file'] = self.backup_file
- if self.return_content:
- if self.csr_bytes is None:
- self.csr_bytes = crypto_utils.load_file_if_exists(self.path, ignore_errors=True)
- result['csr'] = self.csr_bytes.decode('utf-8') if self.csr_bytes else None
-
- return result
-
-
-class CertificateSigningRequestPyOpenSSL(CertificateSigningRequestBase):
-
- def __init__(self, module):
- if module.params['create_subject_key_identifier']:
- module.fail_json(msg='You cannot use create_subject_key_identifier with the pyOpenSSL backend!')
- for o in ('subject_key_identifier', 'authority_key_identifier', 'authority_cert_issuer', 'authority_cert_serial_number'):
- if module.params[o] is not None:
- module.fail_json(msg='You cannot use {0} with the pyOpenSSL backend!'.format(o))
- super(CertificateSigningRequestPyOpenSSL, self).__init__(module)
-
- def _generate_csr(self):
- req = crypto.X509Req()
- req.set_version(self.version - 1)
- subject = req.get_subject()
- for entry in self.subject:
- if entry[1] is not None:
- # Workaround for https://github.com/pyca/pyopenssl/issues/165
- nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(entry[0]))
- if nid == 0:
- raise CertificateSigningRequestError('Unknown subject field identifier "{0}"'.format(entry[0]))
- res = OpenSSL._util.lib.X509_NAME_add_entry_by_NID(subject._name, nid, OpenSSL._util.lib.MBSTRING_UTF8, to_bytes(entry[1]), -1, -1, 0)
- if res == 0:
- raise CertificateSigningRequestError('Invalid value for subject field identifier "{0}": {1}'.format(entry[0], entry[1]))
-
- extensions = []
- if self.subjectAltName:
- altnames = ', '.join(self.subjectAltName)
- try:
- extensions.append(crypto.X509Extension(b"subjectAltName", self.subjectAltName_critical, altnames.encode('ascii')))
- except OpenSSL.crypto.Error as e:
- raise CertificateSigningRequestError(
- 'Error while parsing Subject Alternative Names {0} (check for missing type prefix, such as "DNS:"!): {1}'.format(
- ', '.join(["{0}".format(san) for san in self.subjectAltName]), str(e)
- )
- )
-
- if self.keyUsage:
- usages = ', '.join(self.keyUsage)
- extensions.append(crypto.X509Extension(b"keyUsage", self.keyUsage_critical, usages.encode('ascii')))
-
- if self.extendedKeyUsage:
- usages = ', '.join(self.extendedKeyUsage)
- extensions.append(crypto.X509Extension(b"extendedKeyUsage", self.extendedKeyUsage_critical, usages.encode('ascii')))
-
- if self.basicConstraints:
- usages = ', '.join(self.basicConstraints)
- extensions.append(crypto.X509Extension(b"basicConstraints", self.basicConstraints_critical, usages.encode('ascii')))
-
- if self.ocspMustStaple:
- extensions.append(crypto.X509Extension(OPENSSL_MUST_STAPLE_NAME, self.ocspMustStaple_critical, OPENSSL_MUST_STAPLE_VALUE))
-
- if extensions:
- req.add_extensions(extensions)
-
- req.set_pubkey(self.privatekey)
- req.sign(self.privatekey, self.digest)
- self.request = req
-
- return crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request)
-
- def _load_private_key(self):
- try:
- self.privatekey = crypto_utils.load_privatekey(
- path=self.privatekey_path,
- content=self.privatekey_content,
- passphrase=self.privatekey_passphrase
- )
- except crypto_utils.OpenSSLBadPassphraseError as exc:
- raise CertificateSigningRequestError(exc)
-
- def _normalize_san(self, san):
- # Apparently OpenSSL returns 'IP address' not 'IP' as specifier when converting the subjectAltName to string
- # although it won't accept this specifier when generating the CSR. (https://github.com/openssl/openssl/issues/4004)
- if san.startswith('IP Address:'):
- san = 'IP:' + san[len('IP Address:'):]
- if san.startswith('IP:'):
- ip = compat_ipaddress.ip_address(san[3:])
- san = 'IP:{0}'.format(ip.compressed)
- return san
-
- def _check_csr(self):
- def _check_subject(csr):
- subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in self.subject]
- current_subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in csr.get_subject().get_components()]
- if not set(subject) == set(current_subject):
- return False
-
- return True
-
- def _check_subjectAltName(extensions):
- altnames_ext = next((ext for ext in extensions if ext.get_short_name() == b'subjectAltName'), '')
- altnames = [self._normalize_san(altname.strip()) for altname in
- to_text(altnames_ext, errors='surrogate_or_strict').split(',') if altname.strip()]
- if self.subjectAltName:
- if (set(altnames) != set([self._normalize_san(to_text(name)) for name in self.subjectAltName]) or
- altnames_ext.get_critical() != self.subjectAltName_critical):
- return False
- else:
- if altnames:
- return False
-
- return True
-
- def _check_keyUsage_(extensions, extName, expected, critical):
- usages_ext = [ext for ext in extensions if ext.get_short_name() == extName]
- if (not usages_ext and expected) or (usages_ext and not expected):
- return False
- elif not usages_ext and not expected:
- return True
- else:
- current = [OpenSSL._util.lib.OBJ_txt2nid(to_bytes(usage.strip())) for usage in str(usages_ext[0]).split(',')]
- expected = [OpenSSL._util.lib.OBJ_txt2nid(to_bytes(usage)) for usage in expected]
- return set(current) == set(expected) and usages_ext[0].get_critical() == critical
-
- def _check_keyUsage(extensions):
- usages_ext = [ext for ext in extensions if ext.get_short_name() == b'keyUsage']
- if (not usages_ext and self.keyUsage) or (usages_ext and not self.keyUsage):
- return False
- elif not usages_ext and not self.keyUsage:
- return True
- else:
- # OpenSSL._util.lib.OBJ_txt2nid() always returns 0 for all keyUsage values
- # (since keyUsage has a fixed bitfield for these values and is not extensible).
- # Therefore, we create an extension for the wanted values, and compare the
- # data of the extensions (which is the serialized bitfield).
- expected_ext = crypto.X509Extension(b"keyUsage", False, ', '.join(self.keyUsage).encode('ascii'))
- return usages_ext[0].get_data() == expected_ext.get_data() and usages_ext[0].get_critical() == self.keyUsage_critical
-
- def _check_extenededKeyUsage(extensions):
- return _check_keyUsage_(extensions, b'extendedKeyUsage', self.extendedKeyUsage, self.extendedKeyUsage_critical)
-
- def _check_basicConstraints(extensions):
- return _check_keyUsage_(extensions, b'basicConstraints', self.basicConstraints, self.basicConstraints_critical)
-
- def _check_ocspMustStaple(extensions):
- oms_ext = [ext for ext in extensions if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE]
- if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000:
- # Older versions of libssl don't know about OCSP Must Staple
- oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05'])
- if self.ocspMustStaple:
- return len(oms_ext) > 0 and oms_ext[0].get_critical() == self.ocspMustStaple_critical
- else:
- return len(oms_ext) == 0
-
- def _check_extensions(csr):
- extensions = csr.get_extensions()
- return (_check_subjectAltName(extensions) and _check_keyUsage(extensions) and
- _check_extenededKeyUsage(extensions) and _check_basicConstraints(extensions) and
- _check_ocspMustStaple(extensions))
-
- def _check_signature(csr):
- try:
- return csr.verify(self.privatekey)
- except crypto.Error:
- return False
-
- try:
- csr = crypto_utils.load_certificate_request(self.path, backend='pyopenssl')
- except Exception as dummy:
- return False
-
- return _check_subject(csr) and _check_extensions(csr) and _check_signature(csr)
-
-
-class CertificateSigningRequestCryptography(CertificateSigningRequestBase):
-
- def __init__(self, module):
- super(CertificateSigningRequestCryptography, self).__init__(module)
- self.cryptography_backend = cryptography.hazmat.backends.default_backend()
- self.module = module
- if self.version != 1:
- module.warn('The cryptography backend only supports version 1. (The only valid value according to RFC 2986.)')
-
- def _generate_csr(self):
- csr = cryptography.x509.CertificateSigningRequestBuilder()
- try:
- csr = csr.subject_name(cryptography.x509.Name([
- cryptography.x509.NameAttribute(crypto_utils.cryptography_name_to_oid(entry[0]), to_text(entry[1])) for entry in self.subject
- ]))
- except ValueError as e:
- raise CertificateSigningRequestError(e)
-
- if self.subjectAltName:
- csr = csr.add_extension(cryptography.x509.SubjectAlternativeName([
- crypto_utils.cryptography_get_name(name) for name in self.subjectAltName
- ]), critical=self.subjectAltName_critical)
-
- if self.keyUsage:
- params = crypto_utils.cryptography_parse_key_usage_params(self.keyUsage)
- csr = csr.add_extension(cryptography.x509.KeyUsage(**params), critical=self.keyUsage_critical)
-
- if self.extendedKeyUsage:
- usages = [crypto_utils.cryptography_name_to_oid(usage) for usage in self.extendedKeyUsage]
- csr = csr.add_extension(cryptography.x509.ExtendedKeyUsage(usages), critical=self.extendedKeyUsage_critical)
-
- if self.basicConstraints:
- params = {}
- ca, path_length = crypto_utils.cryptography_get_basic_constraints(self.basicConstraints)
- csr = csr.add_extension(cryptography.x509.BasicConstraints(ca, path_length), critical=self.basicConstraints_critical)
-
- if self.ocspMustStaple:
- try:
- # This only works with cryptography >= 2.1
- csr = csr.add_extension(cryptography.x509.TLSFeature([cryptography.x509.TLSFeatureType.status_request]), critical=self.ocspMustStaple_critical)
- except AttributeError as dummy:
- csr = csr.add_extension(
- cryptography.x509.UnrecognizedExtension(CRYPTOGRAPHY_MUST_STAPLE_NAME, CRYPTOGRAPHY_MUST_STAPLE_VALUE),
- critical=self.ocspMustStaple_critical
- )
-
- if self.create_subject_key_identifier:
- csr = csr.add_extension(
- cryptography.x509.SubjectKeyIdentifier.from_public_key(self.privatekey.public_key()),
- critical=False
- )
- elif self.subject_key_identifier is not None:
- csr = csr.add_extension(cryptography.x509.SubjectKeyIdentifier(self.subject_key_identifier), critical=False)
-
- if self.authority_key_identifier is not None or self.authority_cert_issuer is not None or self.authority_cert_serial_number is not None:
- issuers = None
- if self.authority_cert_issuer is not None:
- issuers = [crypto_utils.cryptography_get_name(n) for n in self.authority_cert_issuer]
- csr = csr.add_extension(
- cryptography.x509.AuthorityKeyIdentifier(self.authority_key_identifier, issuers, self.authority_cert_serial_number),
- critical=False
- )
-
- digest = None
- if crypto_utils.cryptography_key_needs_digest_for_signing(self.privatekey):
- if self.digest == 'sha256':
- digest = cryptography.hazmat.primitives.hashes.SHA256()
- elif self.digest == 'sha384':
- digest = cryptography.hazmat.primitives.hashes.SHA384()
- elif self.digest == 'sha512':
- digest = cryptography.hazmat.primitives.hashes.SHA512()
- elif self.digest == 'sha1':
- digest = cryptography.hazmat.primitives.hashes.SHA1()
- elif self.digest == 'md5':
- digest = cryptography.hazmat.primitives.hashes.MD5()
- # FIXME
- else:
- raise CertificateSigningRequestError('Unsupported digest "{0}"'.format(self.digest))
- try:
- self.request = csr.sign(self.privatekey, digest, self.cryptography_backend)
- except TypeError as e:
- if str(e) == 'Algorithm must be a registered hash algorithm.' and digest is None:
- self.module.fail_json(msg='Signing with Ed25519 and Ed448 keys requires cryptography 2.8 or newer.')
- raise
-
- return self.request.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.PEM)
-
- def _load_private_key(self):
- try:
- if self.privatekey_content is not None:
- content = self.privatekey_content
- else:
- with open(self.privatekey_path, 'rb') as f:
- content = f.read()
- self.privatekey = cryptography.hazmat.primitives.serialization.load_pem_private_key(
- content,
- None if self.privatekey_passphrase is None else to_bytes(self.privatekey_passphrase),
- backend=self.cryptography_backend
- )
- except Exception as e:
- raise CertificateSigningRequestError(e)
-
- def _check_csr(self):
- def _check_subject(csr):
- subject = [(crypto_utils.cryptography_name_to_oid(entry[0]), entry[1]) for entry in self.subject]
- current_subject = [(sub.oid, sub.value) for sub in csr.subject]
- return set(subject) == set(current_subject)
-
- def _find_extension(extensions, exttype):
- return next(
- (ext for ext in extensions if isinstance(ext.value, exttype)),
- None
- )
-
- def _check_subjectAltName(extensions):
- current_altnames_ext = _find_extension(extensions, cryptography.x509.SubjectAlternativeName)
- current_altnames = [str(altname) for altname in current_altnames_ext.value] if current_altnames_ext else []
- altnames = [str(crypto_utils.cryptography_get_name(altname)) for altname in self.subjectAltName] if self.subjectAltName else []
- if set(altnames) != set(current_altnames):
- return False
- if altnames:
- if current_altnames_ext.critical != self.subjectAltName_critical:
- return False
- return True
-
- def _check_keyUsage(extensions):
- current_keyusage_ext = _find_extension(extensions, cryptography.x509.KeyUsage)
- if not self.keyUsage:
- return current_keyusage_ext is None
- elif current_keyusage_ext is None:
- return False
- params = crypto_utils.cryptography_parse_key_usage_params(self.keyUsage)
- for param in params:
- if getattr(current_keyusage_ext.value, '_' + param) != params[param]:
- return False
- if current_keyusage_ext.critical != self.keyUsage_critical:
- return False
- return True
-
- def _check_extenededKeyUsage(extensions):
- current_usages_ext = _find_extension(extensions, cryptography.x509.ExtendedKeyUsage)
- current_usages = [str(usage) for usage in current_usages_ext.value] if current_usages_ext else []
- usages = [str(crypto_utils.cryptography_name_to_oid(usage)) for usage in self.extendedKeyUsage] if self.extendedKeyUsage else []
- if set(current_usages) != set(usages):
- return False
- if usages:
- if current_usages_ext.critical != self.extendedKeyUsage_critical:
- return False
- return True
-
- def _check_basicConstraints(extensions):
- bc_ext = _find_extension(extensions, cryptography.x509.BasicConstraints)
- current_ca = bc_ext.value.ca if bc_ext else False
- current_path_length = bc_ext.value.path_length if bc_ext else None
- ca, path_length = crypto_utils.cryptography_get_basic_constraints(self.basicConstraints)
- # Check CA flag
- if ca != current_ca:
- return False
- # Check path length
- if path_length != current_path_length:
- return False
- # Check criticality
- if self.basicConstraints:
- if bc_ext.critical != self.basicConstraints_critical:
- return False
- return True
-
- def _check_ocspMustStaple(extensions):
- try:
- # This only works with cryptography >= 2.1
- tlsfeature_ext = _find_extension(extensions, cryptography.x509.TLSFeature)
- has_tlsfeature = True
- except AttributeError as dummy:
- tlsfeature_ext = next(
- (ext for ext in extensions if ext.value.oid == CRYPTOGRAPHY_MUST_STAPLE_NAME),
- None
- )
- has_tlsfeature = False
- if self.ocspMustStaple:
- if not tlsfeature_ext or tlsfeature_ext.critical != self.ocspMustStaple_critical:
- return False
- if has_tlsfeature:
- return cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value
- else:
- return tlsfeature_ext.value.value == CRYPTOGRAPHY_MUST_STAPLE_VALUE
- else:
- return tlsfeature_ext is None
-
- def _check_subject_key_identifier(extensions):
- ext = _find_extension(extensions, cryptography.x509.SubjectKeyIdentifier)
- if self.create_subject_key_identifier or self.subject_key_identifier is not None:
- if not ext or ext.critical:
- return False
- if self.create_subject_key_identifier:
- digest = cryptography.x509.SubjectKeyIdentifier.from_public_key(self.privatekey.public_key()).digest
- return ext.value.digest == digest
- else:
- return ext.value.digest == self.subject_key_identifier
- else:
- return ext is None
-
- def _check_authority_key_identifier(extensions):
- ext = _find_extension(extensions, cryptography.x509.AuthorityKeyIdentifier)
- if self.authority_key_identifier is not None or self.authority_cert_issuer is not None or self.authority_cert_serial_number is not None:
- if not ext or ext.critical:
- return False
- aci = None
- csr_aci = None
- if self.authority_cert_issuer is not None:
- aci = [str(crypto_utils.cryptography_get_name(n)) for n in self.authority_cert_issuer]
- if ext.value.authority_cert_issuer is not None:
- csr_aci = [str(n) for n in ext.value.authority_cert_issuer]
- return (ext.value.key_identifier == self.authority_key_identifier
- and csr_aci == aci
- and ext.value.authority_cert_serial_number == self.authority_cert_serial_number)
- else:
- return ext is None
-
- def _check_extensions(csr):
- extensions = csr.extensions
- return (_check_subjectAltName(extensions) and _check_keyUsage(extensions) and
- _check_extenededKeyUsage(extensions) and _check_basicConstraints(extensions) and
- _check_ocspMustStaple(extensions) and _check_subject_key_identifier(extensions) and
- _check_authority_key_identifier(extensions))
-
- def _check_signature(csr):
- if not csr.is_signature_valid:
- return False
- # To check whether public key of CSR belongs to private key,
- # encode both public keys and compare PEMs.
- key_a = csr.public_key().public_bytes(
- cryptography.hazmat.primitives.serialization.Encoding.PEM,
- cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo
- )
- key_b = self.privatekey.public_key().public_bytes(
- cryptography.hazmat.primitives.serialization.Encoding.PEM,
- cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo
- )
- return key_a == key_b
-
- try:
- csr = crypto_utils.load_certificate_request(self.path, backend='cryptography')
- except Exception as dummy:
- return False
-
- return _check_subject(csr) and _check_extensions(csr) and _check_signature(csr)
-
-
-def main():
- module = AnsibleModule(
- argument_spec=dict(
- state=dict(type='str', default='present', choices=['absent', 'present']),
- digest=dict(type='str', default='sha256'),
- privatekey_path=dict(type='path'),
- privatekey_content=dict(type='str'),
- privatekey_passphrase=dict(type='str', no_log=True),
- version=dict(type='int', default=1),
- force=dict(type='bool', default=False),
- path=dict(type='path', required=True),
- subject=dict(type='dict'),
- country_name=dict(type='str', aliases=['C', 'countryName']),
- state_or_province_name=dict(type='str', aliases=['ST', 'stateOrProvinceName']),
- locality_name=dict(type='str', aliases=['L', 'localityName']),
- organization_name=dict(type='str', aliases=['O', 'organizationName']),
- organizational_unit_name=dict(type='str', aliases=['OU', 'organizationalUnitName']),
- common_name=dict(type='str', aliases=['CN', 'commonName']),
- email_address=dict(type='str', aliases=['E', 'emailAddress']),
- subject_alt_name=dict(type='list', elements='str', aliases=['subjectAltName']),
- subject_alt_name_critical=dict(type='bool', default=False, aliases=['subjectAltName_critical']),
- use_common_name_for_san=dict(type='bool', default=True, aliases=['useCommonNameForSAN']),
- key_usage=dict(type='list', elements='str', aliases=['keyUsage']),
- key_usage_critical=dict(type='bool', default=False, aliases=['keyUsage_critical']),
- extended_key_usage=dict(type='list', elements='str', aliases=['extKeyUsage', 'extendedKeyUsage']),
- extended_key_usage_critical=dict(type='bool', default=False, aliases=['extKeyUsage_critical', 'extendedKeyUsage_critical']),
- basic_constraints=dict(type='list', elements='str', aliases=['basicConstraints']),
- basic_constraints_critical=dict(type='bool', default=False, aliases=['basicConstraints_critical']),
- ocsp_must_staple=dict(type='bool', default=False, aliases=['ocspMustStaple']),
- ocsp_must_staple_critical=dict(type='bool', default=False, aliases=['ocspMustStaple_critical']),
- backup=dict(type='bool', default=False),
- create_subject_key_identifier=dict(type='bool', default=False),
- subject_key_identifier=dict(type='str'),
- authority_key_identifier=dict(type='str'),
- authority_cert_issuer=dict(type='list', elements='str'),
- authority_cert_serial_number=dict(type='int'),
- select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
- return_content=dict(type='bool', default=False),
- ),
- required_together=[('authority_cert_issuer', 'authority_cert_serial_number')],
- required_if=[('state', 'present', ['privatekey_path', 'privatekey_content'], True)],
- mutually_exclusive=(
- ['privatekey_path', 'privatekey_content'],
- ),
- add_file_common_args=True,
- supports_check_mode=True,
- )
-
- if module.params['version'] != 1:
- module.deprecate('The version option will only support allowed values from Ansible 2.14 on. '
- 'Currently, only the value 1 is allowed by RFC 2986',
- version='2.14', collection_name='ansible.builtin')
-
- base_dir = os.path.dirname(module.params['path']) or '.'
- if not os.path.isdir(base_dir):
- module.fail_json(name=base_dir, msg='The directory %s does not exist or the file is not a directory' % base_dir)
-
- backend = module.params['select_crypto_backend']
- if backend == 'auto':
- # Detection what is possible
- can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
- can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
-
- # First try cryptography, then pyOpenSSL
- if can_use_cryptography:
- backend = 'cryptography'
- elif can_use_pyopenssl:
- backend = 'pyopenssl'
-
- # Success?
- if backend == 'auto':
- module.fail_json(msg=("Can't detect any of the required Python libraries "
- "cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
- MINIMAL_CRYPTOGRAPHY_VERSION,
- MINIMAL_PYOPENSSL_VERSION))
- try:
- if backend == 'pyopenssl':
- if not PYOPENSSL_FOUND:
- module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
- exception=PYOPENSSL_IMP_ERR)
- try:
- getattr(crypto.X509Req, 'get_extensions')
- except AttributeError:
- module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs')
-
- module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated',
- version='2.13', collection_name='ansible.builtin')
- csr = CertificateSigningRequestPyOpenSSL(module)
- elif backend == 'cryptography':
- if not CRYPTOGRAPHY_FOUND:
- module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
- exception=CRYPTOGRAPHY_IMP_ERR)
- csr = CertificateSigningRequestCryptography(module)
-
- if module.params['state'] == 'present':
- if module.check_mode:
- result = csr.dump()
- result['changed'] = module.params['force'] or not csr.check(module)
- module.exit_json(**result)
-
- csr.generate(module)
-
- else:
- if module.check_mode:
- result = csr.dump()
- result['changed'] = os.path.exists(module.params['path'])
- module.exit_json(**result)
-
- csr.remove(module)
-
- result = csr.dump()
- module.exit_json(**result)
- except crypto_utils.OpenSSLObjectError as exc:
- module.fail_json(msg=to_native(exc))
-
-
-if __name__ == "__main__":
- main()
diff --git a/test/support/integration/plugins/modules/openssl_privatekey.py b/test/support/integration/plugins/modules/openssl_privatekey.py
deleted file mode 100644
index 9c247a3942..0000000000
--- a/test/support/integration/plugins/modules/openssl_privatekey.py
+++ /dev/null
@@ -1,944 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-
-# Copyright: (c) 2016, Yanis Guenane <yanis+ansible@guenane.org>
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import absolute_import, division, print_function
-__metaclass__ = type
-
-ANSIBLE_METADATA = {'metadata_version': '1.1',
- 'status': ['preview'],
- 'supported_by': 'community'}
-
-DOCUMENTATION = r'''
----
-module: openssl_privatekey
-version_added: "2.3"
-short_description: Generate OpenSSL private keys
-description:
- - This module allows one to (re)generate OpenSSL private keys.
- - One can generate L(RSA,https://en.wikipedia.org/wiki/RSA_%28cryptosystem%29),
- L(DSA,https://en.wikipedia.org/wiki/Digital_Signature_Algorithm),
- L(ECC,https://en.wikipedia.org/wiki/Elliptic-curve_cryptography) or
- L(EdDSA,https://en.wikipedia.org/wiki/EdDSA) private keys.
- - Keys are generated in PEM format.
- - "Please note that the module regenerates private keys if they don't match
- the module's options. In particular, if you provide another passphrase
- (or specify none), change the keysize, etc., the private key will be
- regenerated. If you are concerned that this could **overwrite your private key**,
- consider using the I(backup) option."
- - The module can use the cryptography Python library, or the pyOpenSSL Python
- library. By default, it tries to detect which one is available. This can be
- overridden with the I(select_crypto_backend) option. Please note that the
- PyOpenSSL backend was deprecated in Ansible 2.9 and will be removed in Ansible 2.13."
-requirements:
- - Either cryptography >= 1.2.3 (older versions might work as well)
- - Or pyOpenSSL
-author:
- - Yanis Guenane (@Spredzy)
- - Felix Fontein (@felixfontein)
-options:
- state:
- description:
- - Whether the private key should exist or not, taking action if the state is different from what is stated.
- type: str
- default: present
- choices: [ absent, present ]
- size:
- description:
- - Size (in bits) of the TLS/SSL key to generate.
- type: int
- default: 4096
- type:
- description:
- - The algorithm used to generate the TLS/SSL private key.
- - Note that C(ECC), C(X25519), C(X448), C(Ed25519) and C(Ed448) require the C(cryptography) backend.
- C(X25519) needs cryptography 2.5 or newer, while C(X448), C(Ed25519) and C(Ed448) require
- cryptography 2.6 or newer. For C(ECC), the minimal cryptography version required depends on the
- I(curve) option.
- type: str
- default: RSA
- choices: [ DSA, ECC, Ed25519, Ed448, RSA, X25519, X448 ]
- curve:
- description:
- - Note that not all curves are supported by all versions of C(cryptography).
- - For maximal interoperability, C(secp384r1) or C(secp256r1) should be used.
- - We use the curve names as defined in the
- L(IANA registry for TLS,https://www.iana.org/assignments/tls-parameters/tls-parameters.xhtml#tls-parameters-8).
- type: str
- choices:
- - secp384r1
- - secp521r1
- - secp224r1
- - secp192r1
- - secp256r1
- - secp256k1
- - brainpoolP256r1
- - brainpoolP384r1
- - brainpoolP512r1
- - sect571k1
- - sect409k1
- - sect283k1
- - sect233k1
- - sect163k1
- - sect571r1
- - sect409r1
- - sect283r1
- - sect233r1
- - sect163r2
- version_added: "2.8"
- force:
- description:
- - Should the key be regenerated even if it already exists.
- type: bool
- default: no
- path:
- description:
- - Name of the file in which the generated TLS/SSL private key will be written. It will have 0600 mode.
- type: path
- required: true
- passphrase:
- description:
- - The passphrase for the private key.
- type: str
- version_added: "2.4"
- cipher:
- description:
- - The cipher to encrypt the private key. (Valid values can be found by
- running `openssl list -cipher-algorithms` or `openssl list-cipher-algorithms`,
- depending on your OpenSSL version.)
- - When using the C(cryptography) backend, use C(auto).
- type: str
- version_added: "2.4"
- select_crypto_backend:
- description:
- - Determines which crypto backend to use.
- - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl).
- - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library.
- - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library.
- - Please note that the C(pyopenssl) backend has been deprecated in Ansible 2.9, and will be removed in Ansible 2.13.
- From that point on, only the C(cryptography) backend will be available.
- type: str
- default: auto
- choices: [ auto, cryptography, pyopenssl ]
- version_added: "2.8"
- format:
- description:
- - Determines which format the private key is written in. By default, PKCS1 (traditional OpenSSL format)
- is used for all keys which support it. Please note that not every key can be exported in any format.
- - The value C(auto) selects a fromat based on the key format. The value C(auto_ignore) does the same,
- but for existing private key files, it will not force a regenerate when its format is not the automatically
- selected one for generation.
- - Note that if the format for an existing private key mismatches, the key is *regenerated* by default.
- To change this behavior, use the I(format_mismatch) option.
- - The I(format) option is only supported by the C(cryptography) backend. The C(pyopenssl) backend will
- fail if a value different from C(auto_ignore) is used.
- type: str
- default: auto_ignore
- choices: [ pkcs1, pkcs8, raw, auto, auto_ignore ]
- version_added: "2.10"
- format_mismatch:
- description:
- - Determines behavior of the module if the format of a private key does not match the expected format, but all
- other parameters are as expected.
- - If set to C(regenerate) (default), generates a new private key.
- - If set to C(convert), the key will be converted to the new format instead.
- - Only supported by the C(cryptography) backend.
- type: str
- default: regenerate
- choices: [ regenerate, convert ]
- version_added: "2.10"
- backup:
- description:
- - Create a backup file including a timestamp so you can get
- the original private key back if you overwrote it with a new one by accident.
- type: bool
- default: no
- version_added: "2.8"
- return_content:
- description:
- - If set to C(yes), will return the (current or generated) private key's content as I(privatekey).
- - Note that especially if the private key is not encrypted, you have to make sure that the returned
- value is treated appropriately and not accidentally written to logs etc.! Use with care!
- type: bool
- default: no
- version_added: "2.10"
- regenerate:
- description:
- - Allows to configure in which situations the module is allowed to regenerate private keys.
- The module will always generate a new key if the destination file does not exist.
- - By default, the key will be regenerated when it doesn't match the module's options,
- except when the key cannot be read or the passphrase does not match. Please note that
- this B(changed) for Ansible 2.10. For Ansible 2.9, the behavior was as if C(full_idempotence)
- is specified.
- - If set to C(never), the module will fail if the key cannot be read or the passphrase
- isn't matching, and will never regenerate an existing key.
- - If set to C(fail), the module will fail if the key does not correspond to the module's
- options.
- - If set to C(partial_idempotence), the key will be regenerated if it does not conform to
- the module's options. The key is B(not) regenerated if it cannot be read (broken file),
- the key is protected by an unknown passphrase, or when they key is not protected by a
- passphrase, but a passphrase is specified.
- - If set to C(full_idempotence), the key will be regenerated if it does not conform to the
- module's options. This is also the case if the key cannot be read (broken file), the key
- is protected by an unknown passphrase, or when they key is not protected by a passphrase,
- but a passphrase is specified. Make sure you have a B(backup) when using this option!
- - If set to C(always), the module will always regenerate the key. This is equivalent to
- setting I(force) to C(yes).
- - Note that if I(format_mismatch) is set to C(convert) and everything matches except the
- format, the key will always be converted, except if I(regenerate) is set to C(always).
- type: str
- choices:
- - never
- - fail
- - partial_idempotence
- - full_idempotence
- - always
- default: full_idempotence
- version_added: '2.10'
-extends_documentation_fragment:
-- files
-seealso:
-- module: openssl_certificate
-- module: openssl_csr
-- module: openssl_dhparam
-- module: openssl_pkcs12
-- module: openssl_publickey
-'''
-
-EXAMPLES = r'''
-- name: Generate an OpenSSL private key with the default values (4096 bits, RSA)
- openssl_privatekey:
- path: /etc/ssl/private/ansible.com.pem
-
-- name: Generate an OpenSSL private key with the default values (4096 bits, RSA) and a passphrase
- openssl_privatekey:
- path: /etc/ssl/private/ansible.com.pem
- passphrase: ansible
- cipher: aes256
-
-- name: Generate an OpenSSL private key with a different size (2048 bits)
- openssl_privatekey:
- path: /etc/ssl/private/ansible.com.pem
- size: 2048
-
-- name: Force regenerate an OpenSSL private key if it already exists
- openssl_privatekey:
- path: /etc/ssl/private/ansible.com.pem
- force: yes
-
-- name: Generate an OpenSSL private key with a different algorithm (DSA)
- openssl_privatekey:
- path: /etc/ssl/private/ansible.com.pem
- type: DSA
-'''
-
-RETURN = r'''
-size:
- description: Size (in bits) of the TLS/SSL private key.
- returned: changed or success
- type: int
- sample: 4096
-type:
- description: Algorithm used to generate the TLS/SSL private key.
- returned: changed or success
- type: str
- sample: RSA
-curve:
- description: Elliptic curve used to generate the TLS/SSL private key.
- returned: changed or success, and I(type) is C(ECC)
- type: str
- sample: secp256r1
-filename:
- description: Path to the generated TLS/SSL private key file.
- returned: changed or success
- type: str
- sample: /etc/ssl/private/ansible.com.pem
-fingerprint:
- description:
- - The fingerprint of the public key. Fingerprint will be generated for each C(hashlib.algorithms) available.
- - The PyOpenSSL backend requires PyOpenSSL >= 16.0 for meaningful output.
- returned: changed or success
- type: dict
- sample:
- md5: "84:75:71:72:8d:04:b5:6c:4d:37:6d:66:83:f5:4c:29"
- sha1: "51:cc:7c:68:5d:eb:41:43:88:7e:1a:ae:c7:f8:24:72:ee:71:f6:10"
- sha224: "b1:19:a6:6c:14:ac:33:1d:ed:18:50:d3:06:5c:b2:32:91:f1:f1:52:8c:cb:d5:75:e9:f5:9b:46"
- sha256: "41:ab:c7:cb:d5:5f:30:60:46:99:ac:d4:00:70:cf:a1:76:4f:24:5d:10:24:57:5d:51:6e:09:97:df:2f:de:c7"
- sha384: "85:39:50:4e:de:d9:19:33:40:70:ae:10:ab:59:24:19:51:c3:a2:e4:0b:1c:b1:6e:dd:b3:0c:d9:9e:6a:46:af:da:18:f8:ef:ae:2e:c0:9a:75:2c:9b:b3:0f:3a:5f:3d"
- sha512: "fd:ed:5e:39:48:5f:9f:fe:7f:25:06:3f:79:08:cd:ee:a5:e7:b3:3d:13:82:87:1f:84:e1:f5:c7:28:77:53:94:86:56:38:69:f0:d9:35:22:01:1e:a6:60:...:0f:9b"
-backup_file:
- description: Name of backup file created.
- returned: changed and if I(backup) is C(yes)
- type: str
- sample: /path/to/privatekey.pem.2019-03-09@11:22~
-privatekey:
- description:
- - The (current or generated) private key's content.
- - Will be Base64-encoded if the key is in raw format.
- returned: if I(state) is C(present) and I(return_content) is C(yes)
- type: str
- version_added: "2.10"
-'''
-
-import abc
-import base64
-import os
-import traceback
-from distutils.version import LooseVersion
-
-MINIMAL_PYOPENSSL_VERSION = '0.6'
-MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3'
-
-PYOPENSSL_IMP_ERR = None
-try:
- import OpenSSL
- from OpenSSL import crypto
- PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
-except ImportError:
- PYOPENSSL_IMP_ERR = traceback.format_exc()
- PYOPENSSL_FOUND = False
-else:
- PYOPENSSL_FOUND = True
-
-CRYPTOGRAPHY_IMP_ERR = None
-try:
- import cryptography
- import cryptography.exceptions
- import cryptography.hazmat.backends
- import cryptography.hazmat.primitives.serialization
- import cryptography.hazmat.primitives.asymmetric.rsa
- import cryptography.hazmat.primitives.asymmetric.dsa
- import cryptography.hazmat.primitives.asymmetric.ec
- import cryptography.hazmat.primitives.asymmetric.utils
- CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
-except ImportError:
- CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
- CRYPTOGRAPHY_FOUND = False
-else:
- CRYPTOGRAPHY_FOUND = True
-
-from ansible.module_utils.crypto import (
- CRYPTOGRAPHY_HAS_X25519,
- CRYPTOGRAPHY_HAS_X25519_FULL,
- CRYPTOGRAPHY_HAS_X448,
- CRYPTOGRAPHY_HAS_ED25519,
- CRYPTOGRAPHY_HAS_ED448,
-)
-
-from ansible.module_utils import crypto as crypto_utils
-from ansible.module_utils._text import to_native, to_bytes
-from ansible.module_utils.basic import AnsibleModule, missing_required_lib
-
-
-class PrivateKeyError(crypto_utils.OpenSSLObjectError):
- pass
-
-
-class PrivateKeyBase(crypto_utils.OpenSSLObject):
-
- def __init__(self, module):
- super(PrivateKeyBase, self).__init__(
- module.params['path'],
- module.params['state'],
- module.params['force'],
- module.check_mode
- )
- self.size = module.params['size']
- self.passphrase = module.params['passphrase']
- self.cipher = module.params['cipher']
- self.privatekey = None
- self.fingerprint = {}
- self.format = module.params['format']
- self.format_mismatch = module.params['format_mismatch']
- self.privatekey_bytes = None
- self.return_content = module.params['return_content']
- self.regenerate = module.params['regenerate']
- if self.regenerate == 'always':
- self.force = True
-
- self.backup = module.params['backup']
- self.backup_file = None
-
- if module.params['mode'] is None:
- module.params['mode'] = '0600'
-
- @abc.abstractmethod
- def _generate_private_key(self):
- """(Re-)Generate private key."""
- pass
-
- @abc.abstractmethod
- def _ensure_private_key_loaded(self):
- """Make sure that the private key has been loaded."""
- pass
-
- @abc.abstractmethod
- def _get_private_key_data(self):
- """Return bytes for self.privatekey"""
- pass
-
- @abc.abstractmethod
- def _get_fingerprint(self):
- pass
-
- def generate(self, module):
- """Generate a keypair."""
-
- if not self.check(module, perms_required=False, ignore_conversion=True) or self.force:
- # Regenerate
- if self.backup:
- self.backup_file = module.backup_local(self.path)
- self._generate_private_key()
- privatekey_data = self._get_private_key_data()
- if self.return_content:
- self.privatekey_bytes = privatekey_data
- crypto_utils.write_file(module, privatekey_data, 0o600)
- self.changed = True
- elif not self.check(module, perms_required=False, ignore_conversion=False):
- # Convert
- if self.backup:
- self.backup_file = module.backup_local(self.path)
- self._ensure_private_key_loaded()
- privatekey_data = self._get_private_key_data()
- if self.return_content:
- self.privatekey_bytes = privatekey_data
- crypto_utils.write_file(module, privatekey_data, 0o600)
- self.changed = True
-
- self.fingerprint = self._get_fingerprint()
- file_args = module.load_file_common_arguments(module.params)
- if module.set_fs_attributes_if_different(file_args, False):
- self.changed = True
-
- def remove(self, module):
- if self.backup:
- self.backup_file = module.backup_local(self.path)
- super(PrivateKeyBase, self).remove(module)
-
- @abc.abstractmethod
- def _check_passphrase(self):
- pass
-
- @abc.abstractmethod
- def _check_size_and_type(self):
- pass
-
- @abc.abstractmethod
- def _check_format(self):
- pass
-
- def check(self, module, perms_required=True, ignore_conversion=True):
- """Ensure the resource is in its desired state."""
-
- state_and_perms = super(PrivateKeyBase, self).check(module, perms_required=False)
-
- if not state_and_perms:
- # key does not exist
- return False
-
- if not self._check_passphrase():
- if self.regenerate in ('full_idempotence', 'always'):
- return False
- module.fail_json(msg='Unable to read the key. The key is protected with a another passphrase / no passphrase or broken.'
- ' Will not proceed. To force regeneration, call the module with `generate`'
- ' set to `full_idempotence` or `always`, or with `force=yes`.')
-
- if self.regenerate != 'never':
- if not self._check_size_and_type():
- if self.regenerate in ('partial_idempotence', 'full_idempotence', 'always'):
- return False
- module.fail_json(msg='Key has wrong type and/or size.'
- ' Will not proceed. To force regeneration, call the module with `generate`'
- ' set to `partial_idempotence`, `full_idempotence` or `always`, or with `force=yes`.')
-
- if not self._check_format():
- # During conversion step, convert if format does not match and format_mismatch == 'convert'
- if not ignore_conversion and self.format_mismatch == 'convert':
- return False
- # During generation step, regenerate if format does not match and format_mismatch == 'regenerate'
- if ignore_conversion and self.format_mismatch == 'regenerate' and self.regenerate != 'never':
- if not ignore_conversion or self.regenerate in ('partial_idempotence', 'full_idempotence', 'always'):
- return False
- module.fail_json(msg='Key has wrong format.'
- ' Will not proceed. To force regeneration, call the module with `generate`'
- ' set to `partial_idempotence`, `full_idempotence` or `always`, or with `force=yes`.'
- ' To convert the key, set `format_mismatch` to `convert`.')
-
- # check whether permissions are correct (in case that needs to be checked)
- return not perms_required or super(PrivateKeyBase, self).check(module, perms_required=perms_required)
-
- def dump(self):
- """Serialize the object into a dictionary."""
-
- result = {
- 'size': self.size,
- 'filename': self.path,
- 'changed': self.changed,
- 'fingerprint': self.fingerprint,
- }
- if self.backup_file:
- result['backup_file'] = self.backup_file
- if self.return_content:
- if self.privatekey_bytes is None:
- self.privatekey_bytes = crypto_utils.load_file_if_exists(self.path, ignore_errors=True)
- if self.privatekey_bytes:
- if crypto_utils.identify_private_key_format(self.privatekey_bytes) == 'raw':
- result['privatekey'] = base64.b64encode(self.privatekey_bytes)
- else:
- result['privatekey'] = self.privatekey_bytes.decode('utf-8')
- else:
- result['privatekey'] = None
-
- return result
-
-
-# Implementation with using pyOpenSSL
-class PrivateKeyPyOpenSSL(PrivateKeyBase):
-
- def __init__(self, module):
- super(PrivateKeyPyOpenSSL, self).__init__(module)
-
- if module.params['type'] == 'RSA':
- self.type = crypto.TYPE_RSA
- elif module.params['type'] == 'DSA':
- self.type = crypto.TYPE_DSA
- else:
- module.fail_json(msg="PyOpenSSL backend only supports RSA and DSA keys.")
-
- if self.format != 'auto_ignore':
- module.fail_json(msg="PyOpenSSL backend only supports auto_ignore format.")
-
- def _generate_private_key(self):
- """(Re-)Generate private key."""
- self.privatekey = crypto.PKey()
- try:
- self.privatekey.generate_key(self.type, self.size)
- except (TypeError, ValueError) as exc:
- raise PrivateKeyError(exc)
-
- def _ensure_private_key_loaded(self):
- """Make sure that the private key has been loaded."""
- if self.privatekey is None:
- try:
- self.privatekey = privatekey = crypto_utils.load_privatekey(self.path, self.passphrase)
- except crypto_utils.OpenSSLBadPassphraseError as exc:
- raise PrivateKeyError(exc)
-
- def _get_private_key_data(self):
- """Return bytes for self.privatekey"""
- if self.cipher and self.passphrase:
- return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.privatekey,
- self.cipher, to_bytes(self.passphrase))
- else:
- return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.privatekey)
-
- def _get_fingerprint(self):
- return crypto_utils.get_fingerprint(self.path, self.passphrase)
-
- def _check_passphrase(self):
- try:
- crypto_utils.load_privatekey(self.path, self.passphrase)
- return True
- except Exception as dummy:
- return False
-
- def _check_size_and_type(self):
- def _check_size(privatekey):
- return self.size == privatekey.bits()
-
- def _check_type(privatekey):
- return self.type == privatekey.type()
-
- self._ensure_private_key_loaded()
- return _check_size(self.privatekey) and _check_type(self.privatekey)
-
- def _check_format(self):
- # Not supported by this backend
- return True
-
- def dump(self):
- """Serialize the object into a dictionary."""
-
- result = super(PrivateKeyPyOpenSSL, self).dump()
-
- if self.type == crypto.TYPE_RSA:
- result['type'] = 'RSA'
- else:
- result['type'] = 'DSA'
-
- return result
-
-
-# Implementation with using cryptography
-class PrivateKeyCryptography(PrivateKeyBase):
-
- def _get_ec_class(self, ectype):
- ecclass = cryptography.hazmat.primitives.asymmetric.ec.__dict__.get(ectype)
- if ecclass is None:
- self.module.fail_json(msg='Your cryptography version does not support {0}'.format(ectype))
- return ecclass
-
- def _add_curve(self, name, ectype, deprecated=False):
- def create(size):
- ecclass = self._get_ec_class(ectype)
- return ecclass()
-
- def verify(privatekey):
- ecclass = self._get_ec_class(ectype)
- return isinstance(privatekey.private_numbers().public_numbers.curve, ecclass)
-
- self.curves[name] = {
- 'create': create,
- 'verify': verify,
- 'deprecated': deprecated,
- }
-
- def __init__(self, module):
- super(PrivateKeyCryptography, self).__init__(module)
-
- self.curves = dict()
- self._add_curve('secp384r1', 'SECP384R1')
- self._add_curve('secp521r1', 'SECP521R1')
- self._add_curve('secp224r1', 'SECP224R1')
- self._add_curve('secp192r1', 'SECP192R1')
- self._add_curve('secp256r1', 'SECP256R1')
- self._add_curve('secp256k1', 'SECP256K1')
- self._add_curve('brainpoolP256r1', 'BrainpoolP256R1', deprecated=True)
- self._add_curve('brainpoolP384r1', 'BrainpoolP384R1', deprecated=True)
- self._add_curve('brainpoolP512r1', 'BrainpoolP512R1', deprecated=True)
- self._add_curve('sect571k1', 'SECT571K1', deprecated=True)
- self._add_curve('sect409k1', 'SECT409K1', deprecated=True)
- self._add_curve('sect283k1', 'SECT283K1', deprecated=True)
- self._add_curve('sect233k1', 'SECT233K1', deprecated=True)
- self._add_curve('sect163k1', 'SECT163K1', deprecated=True)
- self._add_curve('sect571r1', 'SECT571R1', deprecated=True)
- self._add_curve('sect409r1', 'SECT409R1', deprecated=True)
- self._add_curve('sect283r1', 'SECT283R1', deprecated=True)
- self._add_curve('sect233r1', 'SECT233R1', deprecated=True)
- self._add_curve('sect163r2', 'SECT163R2', deprecated=True)
-
- self.module = module
- self.cryptography_backend = cryptography.hazmat.backends.default_backend()
-
- self.type = module.params['type']
- self.curve = module.params['curve']
- if not CRYPTOGRAPHY_HAS_X25519 and self.type == 'X25519':
- self.module.fail_json(msg='Your cryptography version does not support X25519')
- if not CRYPTOGRAPHY_HAS_X25519_FULL and self.type == 'X25519':
- self.module.fail_json(msg='Your cryptography version does not support X25519 serialization')
- if not CRYPTOGRAPHY_HAS_X448 and self.type == 'X448':
- self.module.fail_json(msg='Your cryptography version does not support X448')
- if not CRYPTOGRAPHY_HAS_ED25519 and self.type == 'Ed25519':
- self.module.fail_json(msg='Your cryptography version does not support Ed25519')
- if not CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448':
- self.module.fail_json(msg='Your cryptography version does not support Ed448')
-
- def _get_wanted_format(self):
- if self.format not in ('auto', 'auto_ignore'):
- return self.format
- if self.type in ('X25519', 'X448', 'Ed25519', 'Ed448'):
- return 'pkcs8'
- else:
- return 'pkcs1'
-
- def _generate_private_key(self):
- """(Re-)Generate private key."""
- try:
- if self.type == 'RSA':
- self.privatekey = cryptography.hazmat.primitives.asymmetric.rsa.generate_private_key(
- public_exponent=65537, # OpenSSL always uses this
- key_size=self.size,
- backend=self.cryptography_backend
- )
- if self.type == 'DSA':
- self.privatekey = cryptography.hazmat.primitives.asymmetric.dsa.generate_private_key(
- key_size=self.size,
- backend=self.cryptography_backend
- )
- if CRYPTOGRAPHY_HAS_X25519_FULL and self.type == 'X25519':
- self.privatekey = cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.generate()
- if CRYPTOGRAPHY_HAS_X448 and self.type == 'X448':
- self.privatekey = cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.generate()
- if CRYPTOGRAPHY_HAS_ED25519 and self.type == 'Ed25519':
- self.privatekey = cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.generate()
- if CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448':
- self.privatekey = cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey.generate()
- if self.type == 'ECC' and self.curve in self.curves:
- if self.curves[self.curve]['deprecated']:
- self.module.warn('Elliptic curves of type {0} should not be used for new keys!'.format(self.curve))
- self.privatekey = cryptography.hazmat.primitives.asymmetric.ec.generate_private_key(
- curve=self.curves[self.curve]['create'](self.size),
- backend=self.cryptography_backend
- )
- except cryptography.exceptions.UnsupportedAlgorithm as dummy:
- self.module.fail_json(msg='Cryptography backend does not support the algorithm required for {0}'.format(self.type))
-
- def _ensure_private_key_loaded(self):
- """Make sure that the private key has been loaded."""
- if self.privatekey is None:
- self.privatekey = self._load_privatekey()
-
- def _get_private_key_data(self):
- """Return bytes for self.privatekey"""
- # Select export format and encoding
- try:
- export_format = self._get_wanted_format()
- export_encoding = cryptography.hazmat.primitives.serialization.Encoding.PEM
- if export_format == 'pkcs1':
- # "TraditionalOpenSSL" format is PKCS1
- export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.TraditionalOpenSSL
- elif export_format == 'pkcs8':
- export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8
- elif export_format == 'raw':
- export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.Raw
- export_encoding = cryptography.hazmat.primitives.serialization.Encoding.Raw
- except AttributeError:
- self.module.fail_json(msg='Cryptography backend does not support the selected output format "{0}"'.format(self.format))
-
- # Select key encryption
- encryption_algorithm = cryptography.hazmat.primitives.serialization.NoEncryption()
- if self.cipher and self.passphrase:
- if self.cipher == 'auto':
- encryption_algorithm = cryptography.hazmat.primitives.serialization.BestAvailableEncryption(to_bytes(self.passphrase))
- else:
- self.module.fail_json(msg='Cryptography backend can only use "auto" for cipher option.')
-
- # Serialize key
- try:
- return self.privatekey.private_bytes(
- encoding=export_encoding,
- format=export_format,
- encryption_algorithm=encryption_algorithm
- )
- except ValueError as dummy:
- self.module.fail_json(
- msg='Cryptography backend cannot serialize the private key in the required format "{0}"'.format(self.format)
- )
- except Exception as dummy:
- self.module.fail_json(
- msg='Error while serializing the private key in the required format "{0}"'.format(self.format),
- exception=traceback.format_exc()
- )
-
- def _load_privatekey(self):
- try:
- # Read bytes
- with open(self.path, 'rb') as f:
- data = f.read()
- # Interpret bytes depending on format.
- format = crypto_utils.identify_private_key_format(data)
- if format == 'raw':
- if len(data) == 56 and CRYPTOGRAPHY_HAS_X448:
- return cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.from_private_bytes(data)
- if len(data) == 57 and CRYPTOGRAPHY_HAS_ED448:
- return cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey.from_private_bytes(data)
- if len(data) == 32:
- if CRYPTOGRAPHY_HAS_X25519 and (self.type == 'X25519' or not CRYPTOGRAPHY_HAS_ED25519):
- return cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.from_private_bytes(data)
- if CRYPTOGRAPHY_HAS_ED25519 and (self.type == 'Ed25519' or not CRYPTOGRAPHY_HAS_X25519):
- return cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(data)
- if CRYPTOGRAPHY_HAS_X25519 and CRYPTOGRAPHY_HAS_ED25519:
- try:
- return cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.from_private_bytes(data)
- except Exception:
- return cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(data)
- raise PrivateKeyError('Cannot load raw key')
- else:
- return cryptography.hazmat.primitives.serialization.load_pem_private_key(
- data,
- None if self.passphrase is None else to_bytes(self.passphrase),
- backend=self.cryptography_backend
- )
- except Exception as e:
- raise PrivateKeyError(e)
-
- def _get_fingerprint(self):
- # Get bytes of public key
- private_key = self._load_privatekey()
- public_key = private_key.public_key()
- public_key_bytes = public_key.public_bytes(
- cryptography.hazmat.primitives.serialization.Encoding.DER,
- cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo
- )
- # Get fingerprints of public_key_bytes
- return crypto_utils.get_fingerprint_of_bytes(public_key_bytes)
-
- def _check_passphrase(self):
- try:
- with open(self.path, 'rb') as f:
- data = f.read()
- format = crypto_utils.identify_private_key_format(data)
- if format == 'raw':
- # Raw keys cannot be encrypted. To avoid incompatibilities, we try to
- # actually load the key (and return False when this fails).
- self._load_privatekey()
- # Loading the key succeeded. Only return True when no passphrase was
- # provided.
- return self.passphrase is None
- else:
- return cryptography.hazmat.primitives.serialization.load_pem_private_key(
- data,
- None if self.passphrase is None else to_bytes(self.passphrase),
- backend=self.cryptography_backend
- )
- except Exception as dummy:
- return False
-
- def _check_size_and_type(self):
- self._ensure_private_key_loaded()
-
- if isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
- return self.type == 'RSA' and self.size == self.privatekey.key_size
- if isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey):
- return self.type == 'DSA' and self.size == self.privatekey.key_size
- if CRYPTOGRAPHY_HAS_X25519 and isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey):
- return self.type == 'X25519'
- if CRYPTOGRAPHY_HAS_X448 and isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey):
- return self.type == 'X448'
- if CRYPTOGRAPHY_HAS_ED25519 and isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey):
- return self.type == 'Ed25519'
- if CRYPTOGRAPHY_HAS_ED448 and isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey):
- return self.type == 'Ed448'
- if isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey):
- if self.type != 'ECC':
- return False
- if self.curve not in self.curves:
- return False
- return self.curves[self.curve]['verify'](self.privatekey)
-
- return False
-
- def _check_format(self):
- if self.format == 'auto_ignore':
- return True
- try:
- with open(self.path, 'rb') as f:
- content = f.read()
- format = crypto_utils.identify_private_key_format(content)
- return format == self._get_wanted_format()
- except Exception as dummy:
- return False
-
- def dump(self):
- """Serialize the object into a dictionary."""
- result = super(PrivateKeyCryptography, self).dump()
- result['type'] = self.type
- if self.type == 'ECC':
- result['curve'] = self.curve
- return result
-
-
-def main():
-
- module = AnsibleModule(
- argument_spec=dict(
- state=dict(type='str', default='present', choices=['present', 'absent']),
- size=dict(type='int', default=4096),
- type=dict(type='str', default='RSA', choices=[
- 'DSA', 'ECC', 'Ed25519', 'Ed448', 'RSA', 'X25519', 'X448'
- ]),
- curve=dict(type='str', choices=[
- 'secp384r1', 'secp521r1', 'secp224r1', 'secp192r1', 'secp256r1',
- 'secp256k1', 'brainpoolP256r1', 'brainpoolP384r1', 'brainpoolP512r1',
- 'sect571k1', 'sect409k1', 'sect283k1', 'sect233k1', 'sect163k1',
- 'sect571r1', 'sect409r1', 'sect283r1', 'sect233r1', 'sect163r2',
- ]),
- force=dict(type='bool', default=False),
- path=dict(type='path', required=True),
- passphrase=dict(type='str', no_log=True),
- cipher=dict(type='str'),
- backup=dict(type='bool', default=False),
- format=dict(type='str', default='auto_ignore', choices=['pkcs1', 'pkcs8', 'raw', 'auto', 'auto_ignore']),
- format_mismatch=dict(type='str', default='regenerate', choices=['regenerate', 'convert']),
- select_crypto_backend=dict(type='str', choices=['auto', 'pyopenssl', 'cryptography'], default='auto'),
- return_content=dict(type='bool', default=False),
- regenerate=dict(
- type='str',
- default='full_idempotence',
- choices=['never', 'fail', 'partial_idempotence', 'full_idempotence', 'always']
- ),
- ),
- supports_check_mode=True,
- add_file_common_args=True,
- required_together=[
- ['cipher', 'passphrase']
- ],
- required_if=[
- ['type', 'ECC', ['curve']],
- ],
- )
-
- base_dir = os.path.dirname(module.params['path']) or '.'
- if not os.path.isdir(base_dir):
- module.fail_json(
- name=base_dir,
- msg='The directory %s does not exist or the file is not a directory' % base_dir
- )
-
- backend = module.params['select_crypto_backend']
- if backend == 'auto':
- # Detection what is possible
- can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
- can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
-
- # Decision
- if module.params['cipher'] and module.params['passphrase'] and module.params['cipher'] != 'auto':
- # First try pyOpenSSL, then cryptography
- if can_use_pyopenssl:
- backend = 'pyopenssl'
- elif can_use_cryptography:
- backend = 'cryptography'
- else:
- # First try cryptography, then pyOpenSSL
- if can_use_cryptography:
- backend = 'cryptography'
- elif can_use_pyopenssl:
- backend = 'pyopenssl'
-
- # Success?
- if backend == 'auto':
- module.fail_json(msg=("Can't detect any of the required Python libraries "
- "cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
- MINIMAL_CRYPTOGRAPHY_VERSION,
- MINIMAL_PYOPENSSL_VERSION))
- try:
- if backend == 'pyopenssl':
- if not PYOPENSSL_FOUND:
- module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
- exception=PYOPENSSL_IMP_ERR)
- module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated',
- version='2.13', collection_name='ansible.builtin')
- private_key = PrivateKeyPyOpenSSL(module)
- elif backend == 'cryptography':
- if not CRYPTOGRAPHY_FOUND:
- module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
- exception=CRYPTOGRAPHY_IMP_ERR)
- private_key = PrivateKeyCryptography(module)
-
- if private_key.state == 'present':
- if module.check_mode:
- result = private_key.dump()
- result['changed'] = private_key.force \
- or not private_key.check(module, ignore_conversion=True) \
- or not private_key.check(module, ignore_conversion=False)
- module.exit_json(**result)
-
- private_key.generate(module)
- else:
- if module.check_mode:
- result = private_key.dump()
- result['changed'] = os.path.exists(module.params['path'])
- module.exit_json(**result)
-
- private_key.remove(module)
-
- result = private_key.dump()
- module.exit_json(**result)
- except crypto_utils.OpenSSLObjectError as exc:
- module.fail_json(msg=to_native(exc))
-
-
-if __name__ == '__main__':
- main()