summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAbhijeet Kasurde <akasurde@redhat.com>2017-05-02 22:05:06 +0530
committerMatt Martz <matt@sivel.net>2017-05-02 11:35:06 -0500
commitfbb924ff24dbe140515b46829d537e95af027647 (patch)
treeef732ca1bb617502565c4de5d1fe82afcfc8b14a
parentdf5be2b8ea79e0a39dca168926b749014eea71fe (diff)
downloadansible-fbb924ff24dbe140515b46829d537e95af027647.tar.gz
Pep8 fixes for letsencrypt module (#24144)
Signed-off-by: Abhijeet Kasurde <akasurde@redhat.com>
-rw-r--r--lib/ansible/modules/web_infrastructure/letsencrypt.py176
-rw-r--r--test/sanity/pep8/legacy-files.txt1
2 files changed, 90 insertions, 87 deletions
diff --git a/lib/ansible/modules/web_infrastructure/letsencrypt.py b/lib/ansible/modules/web_infrastructure/letsencrypt.py
index 556618e7c7..8f778480fd 100644
--- a/lib/ansible/modules/web_infrastructure/letsencrypt.py
+++ b/lib/ansible/modules/web_infrastructure/letsencrypt.py
@@ -174,7 +174,8 @@ from datetime import datetime
def nopad_b64(data):
return base64.urlsafe_b64encode(data).decode('utf8').replace("=", "")
-def simple_get(module,url):
+
+def simple_get(module, url):
resp, info = fetch_url(module, url, method='GET')
result = None
@@ -189,15 +190,16 @@ def simple_get(module,url):
try:
result = module.from_json(content.decode('utf8'))
except ValueError:
- module.fail_json(msg="Failed to parse the ACME response: {0} {1}".format(url,content))
+ module.fail_json(msg="Failed to parse the ACME response: {0} {1}".format(url, content))
else:
result = content
if info['status'] >= 400:
- module.fail_json(msg="ACME request failed: CODE: {0} RESULT:{1}".format(info['status'],result))
+ module.fail_json(msg="ACME request failed: CODE: {0} RESULT: {1}".format(info['status'], result))
return result
-def get_cert_days(module,cert_file):
+
+def get_cert_days(module, cert_file):
'''
Return the days the certificate in cert_file remains valid and -1
if the file was not found.
@@ -207,10 +209,10 @@ def get_cert_days(module,cert_file):
openssl_bin = module.get_bin_path('openssl', True)
openssl_cert_cmd = [openssl_bin, "x509", "-in", cert_file, "-noout", "-text"]
- _, out, _ = module.run_command(openssl_cert_cmd,check_rc=True)
+ _, out, _ = module.run_command(openssl_cert_cmd, check_rc=True)
try:
- not_after_str = re.search(r"\s+Not After\s*:\s+(.*)",out.decode('utf8')).group(1)
- not_after = datetime.datetime.fromtimestamp(time.mktime(time.strptime(not_after_str,'%b %d %H:%M:%S %Y %Z')))
+ not_after_str = re.search(r"\s+Not After\s*:\s+(.*)", out.decode('utf8')).group(1)
+ not_after = datetime.datetime.fromtimestamp(time.mktime(time.strptime(not_after_str, '%b %d %H:%M:%S %Y %Z')))
except AttributeError:
module.fail_json(msg="No 'Not after' date found in {0}".format(cert_file))
except ValueError:
@@ -218,6 +220,7 @@ def get_cert_days(module,cert_file):
now = datetime.datetime.utcnow()
return (not_after - now).days
+
# function source: network/basics/uri.py
def write_file(module, dest, content):
'''
@@ -234,15 +237,15 @@ def write_file(module, dest, content):
os.remove(tmpsrc)
module.fail_json(msg="failed to create temporary content file: %s" % str(err))
f.close()
- checksum_src = None
- checksum_dest = None
+ checksum_src = None
+ checksum_dest = None
# raise an error if there is no tmpsrc file
if not os.path.exists(tmpsrc):
os.remove(tmpsrc)
module.fail_json(msg="Source %s does not exist" % (tmpsrc))
if not os.access(tmpsrc, os.R_OK):
os.remove(tmpsrc)
- module.fail_json( msg="Source %s not readable" % (tmpsrc))
+ module.fail_json(msg="Source %s not readable" % (tmpsrc))
checksum_src = module.sha1(tmpsrc)
# check if there is no dest file
if os.path.exists(dest):
@@ -268,6 +271,7 @@ def write_file(module, dest, content):
os.remove(tmpsrc)
return changed
+
class ACMEDirectory(object):
'''
The ACME server directory. Gives access to the available resources
@@ -277,15 +281,15 @@ class ACMEDirectory(object):
https://tools.ietf.org/html/draft-ietf-acme-acme-02#section-6.2
'''
def __init__(self, module):
- self.module = module
+ self.module = module
self.directory_root = module.params['acme_directory']
- self.directory = simple_get(self.module,self.directory_root)
+ self.directory = simple_get(self.module, self.directory_root)
def __getitem__(self, key):
return self.directory[key]
- def get_nonce(self,resource=None):
+ def get_nonce(self, resource=None):
url = self.directory_root
if resource is not None:
url = resource
@@ -294,21 +298,22 @@ class ACMEDirectory(object):
self.module.fail_json(msg="Failed to get replay-nonce, got status {0}".format(info['status']))
return info['replay-nonce']
+
class ACMEAccount(object):
'''
ACME account object. Handles the authorized communication with the
ACME server. Provides access to account bound information like
the currently active authorizations and valid certificates
'''
- def __init__(self,module):
- self.module = module
- self.agreement = module.params['agreement']
- self.key = module.params['account_key']
- self.email = module.params['account_email']
- self.data = module.params['data']
- self.directory = ACMEDirectory(module)
- self.uri = None
- self.changed = False
+ def __init__(self, module):
+ self.module = module
+ self.agreement = module.params['agreement']
+ self.key = module.params['account_key']
+ self.email = module.params['account_email']
+ self.data = module.params['data']
+ self.directory = ACMEDirectory(module)
+ self.uri = None
+ self.changed = False
self._authz_list_uri = None
self._certs_list_uri = None
@@ -319,7 +324,7 @@ class ACMEAccount(object):
self._openssl_bin = module.get_bin_path('openssl', True)
pub_hex, pub_exp = self._parse_account_key(self.key)
- self.jws_header = {
+ self.jws_header = {
"alg": "RS256",
"jwk": {
"e": nopad_b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
@@ -329,7 +334,7 @@ class ACMEAccount(object):
}
self.init_account()
- def get_keyauthorization(self,token):
+ def get_keyauthorization(self, token):
'''
Returns the key authorization for the given token
https://tools.ietf.org/html/draft-ietf-acme-acme-02#section-7.1
@@ -338,17 +343,17 @@ class ACMEAccount(object):
thumbprint = nopad_b64(hashlib.sha256(accountkey_json.encode('utf8')).digest())
return "{0}.{1}".format(token, thumbprint)
- def _parse_account_key(self,key):
+ def _parse_account_key(self, key):
'''
Parses an RSA key file in PEM format and returns the modulus
and public exponent of the key
'''
openssl_keydump_cmd = [self._openssl_bin, "rsa", "-in", key, "-noout", "-text"]
- _, out, _ = self.module.run_command(openssl_keydump_cmd,check_rc=True)
+ _, out, _ = self.module.run_command(openssl_keydump_cmd, check_rc=True)
pub_hex, pub_exp = re.search(
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
- out.decode('utf8'), re.MULTILINE|re.DOTALL).groups()
+ out.decode('utf8'), re.MULTILINE | re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
if len(pub_exp) % 2:
pub_exp = "0{0}".format(pub_exp)
@@ -372,7 +377,7 @@ class ACMEAccount(object):
openssl_sign_cmd = [self._openssl_bin, "dgst", "-sha256", "-sign", self.key]
sign_payload = "{0}.{1}".format(protected64, payload64).encode('utf8')
- _, out, _ = self.module.run_command(openssl_sign_cmd,data=sign_payload,check_rc=True, binary_data=True)
+ _, out, _ = self.module.run_command(openssl_sign_cmd, data=sign_payload, check_rc=True, binary_data=True)
data = self.module.jsonify({
"header": self.jws_header,
@@ -394,13 +399,13 @@ class ACMEAccount(object):
try:
result = self.module.from_json(content.decode('utf8'))
except ValueError:
- self.module.fail_json(msg="Failed to parse the ACME response: {0} {1}".format(url,content))
+ self.module.fail_json(msg="Failed to parse the ACME response: {0} {1}".format(url, content))
else:
result = content
- return result,info
+ return result, info
- def _new_reg(self,contact=[]):
+ def _new_reg(self, contact=[]):
'''
Registers a new ACME account. Returns True if the account was
created and False if it already existed (e.g. it was not newly
@@ -420,7 +425,7 @@ class ACMEAccount(object):
if 'location' in info:
self.uri = info['location']
- if info['status'] in [200,201]:
+ if info['status'] in [200, 201]:
# Account did not exist
self.changed = True
return True
@@ -448,7 +453,7 @@ class ACMEAccount(object):
# if this is not a new registration (e.g. existing account)
if not self._new_reg(contact):
# pre-existing account, get account data...
- result, _ = self.send_signed_request(self.uri, {'resource':'reg'})
+ result, _ = self.send_signed_request(self.uri, {'resource': 'reg'})
# XXX: letsencrypt/boulder#1435
if 'authorizations' in result:
@@ -459,7 +464,7 @@ class ACMEAccount(object):
# ...and check if update is necessary
do_update = False
if 'contact' in result:
- if cmp(contact,result['contact']) != 0:
+ if cmp(contact, result['contact']) != 0:
do_update = True
elif len(contact) > 0:
do_update = True
@@ -493,55 +498,55 @@ class ACMEAccount(object):
authz = []
for auth_uri in authz_list['authorizations']:
- auth = simple_get(self.module,auth_uri)
+ auth = simple_get(self.module, auth_uri)
auth['uri'] = auth_uri
authz.append(auth)
return authz
+
class ACMEClient(object):
'''
ACME client class. Uses an ACME account object and a CSR to
start and validate ACME challenges and download the respective
certificates.
'''
- def __init__(self,module):
- self.module = module
- self.challenge = module.params['challenge']
- self.csr = module.params['csr']
- self.dest = module.params['dest']
- self.account = ACMEAccount(module)
- self.directory = self.account.directory
+ def __init__(self, module):
+ self.module = module
+ self.challenge = module.params['challenge']
+ self.csr = module.params['csr']
+ self.dest = module.params['dest']
+ self.account = ACMEAccount(module)
+ self.directory = self.account.directory
self.authorizations = self.account.get_authorizations()
- self.cert_days = -1
- self.changed = self.account.changed
+ self.cert_days = -1
+ self.changed = self.account.changed
if not os.path.exists(self.csr):
module.fail_json(msg="CSR %s not found" % (self.csr))
- self._openssl_bin = module.get_bin_path('openssl', True)
- self.domains = self._get_csr_domains()
+ self._openssl_bin = module.get_bin_path('openssl', True)
+ self.domains = self._get_csr_domains()
def _get_csr_domains(self):
'''
Parse the CSR and return the list of requested domains
'''
openssl_csr_cmd = [self._openssl_bin, "req", "-in", self.csr, "-noout", "-text"]
- _, out, _ = self.module.run_command(openssl_csr_cmd,check_rc=True)
+ _, out, _ = self.module.run_command(openssl_csr_cmd, check_rc=True)
domains = set([])
common_name = re.search(r"Subject:.*? CN=([^\s,;/]+)", out.decode('utf8'))
if common_name is not None:
domains.add(common_name.group(1))
- subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL)
+ subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE | re.DOTALL)
if subject_alt_names is not None:
for san in subject_alt_names.group(1).split(", "):
if san.startswith("DNS:"):
domains.add(san[4:])
return domains
-
- def _get_domain_auth(self,domain):
+ def _get_domain_auth(self, domain):
'''
Get the status string of the first authorization for the given domain.
Return None if no active authorization for the given domain was found.
@@ -554,16 +559,16 @@ class ACMEClient(object):
return auth
return None
- def _add_or_update_auth(self,auth):
+ def _add_or_update_auth(self, auth):
'''
Add or update the given authroization in the global authorizations list.
Return True if the auth was updated/added and False if no change was
necessary.
'''
- for index,cur_auth in enumerate(self.authorizations):
+ for index, cur_auth in enumerate(self.authorizations):
if (cur_auth['uri'] == auth['uri']):
# does the auth parameter contain updated data?
- if cmp(cur_auth,auth) != 0:
+ if cmp(cur_auth, auth) != 0:
# yes, update our current authorization list
self.authorizations[index] = auth
return True
@@ -574,7 +579,7 @@ class ACMEClient(object):
self.authorizations.append(auth)
return True
- def _new_authz(self,domain):
+ def _new_authz(self, domain):
'''
Create a new authorization for the given domain.
Return the authorization object of the new authorization
@@ -589,13 +594,13 @@ class ACMEClient(object):
}
result, info = self.account.send_signed_request(self.directory['new-authz'], new_authz)
- if info['status'] not in [200,201]:
+ if info['status'] not in [200, 201]:
self.module.fail_json(msg="Error requesting challenges: CODE: {0} RESULT: {1}".format(info['status'], result))
else:
result['uri'] = info['location']
return result
- def _get_challenge_data(self,auth):
+ def _get_challenge_data(self, auth):
'''
Returns a dict with the data for all proposed (and supported) challenges
of the given authorization.
@@ -625,8 +630,8 @@ class ACMEClient(object):
len_ka_digest = len(ka_digest)
resource = 'subjectAlternativeNames'
value = [
- "{0}.{1}.token.acme.invalid".format(token_digest[:len_token_digest/2],token_digest[len_token_digest/2:]),
- "{0}.{1}.ka.acme.invalid".format(ka_digest[:len_ka_digest/2],ka_digest[len_ka_digest/2:]),
+ "{0}.{1}.token.acme.invalid".format(token_digest[:len_token_digest / 2], token_digest[len_token_digest / 2:]),
+ "{0}.{1}.ka.acme.invalid".format(ka_digest[:len_ka_digest / 2], ka_digest[len_ka_digest / 2:]),
]
elif type == 'dns-01':
# https://tools.ietf.org/html/draft-ietf-acme-acme-02#section-7.4
@@ -635,10 +640,10 @@ class ACMEClient(object):
else:
continue
- data[type] = { 'resource': resource, 'resource_value': value }
+ data[type] = {'resource': resource, 'resource_value': value}
return data
- def _validate_challenges(self,auth):
+ def _validate_challenges(self, auth):
'''
Validate the authorization provided in the auth dict. Returns True
when the validation was successful and False when it was not.
@@ -656,13 +661,13 @@ class ACMEClient(object):
"keyAuthorization": keyauthorization,
}
result, info = self.account.send_signed_request(uri, challenge_response)
- if info['status'] not in [200,202]:
+ if info['status'] not in [200, 202]:
self.module.fail_json(msg="Error validating challenge: CODE: {0} RESULT: {1}".format(info['status'], result))
status = ''
- while status not in ['valid','invalid','revoked']:
- result = simple_get(self.module,auth['uri'])
+ while status not in ['valid', 'invalid', 'revoked']:
+ result = simple_get(self.module, auth['uri'])
result['uri'] = auth['uri']
if self._add_or_update_auth(result):
self.changed = True
@@ -686,7 +691,7 @@ class ACMEClient(object):
error_details += ' DETAILS: {0};'.format(challenge['error']['detail'])
else:
error_details += ';'
- self.module.fail_json(msg="Authorization for {0} returned invalid: {1}".format(result['identifier']['value'],error_details))
+ self.module.fail_json(msg="Authorization for {0} returned invalid: {1}".format(result['identifier']['value'], error_details))
return status == 'valid'
@@ -697,19 +702,19 @@ class ACMEClient(object):
https://tools.ietf.org/html/draft-ietf-acme-acme-02#section-6.5
'''
openssl_csr_cmd = [self._openssl_bin, "req", "-in", self.csr, "-outform", "DER"]
- _, out, _ = self.module.run_command(openssl_csr_cmd,check_rc=True)
+ _, out, _ = self.module.run_command(openssl_csr_cmd, check_rc=True)
new_cert = {
"resource": "new-cert",
"csr": nopad_b64(out),
}
result, info = self.account.send_signed_request(self.directory['new-cert'], new_cert)
- if info['status'] not in [200,201]:
+ if info['status'] not in [200, 201]:
self.module.fail_json(msg="Error new cert: CODE: {0} RESULT: {1}".format(info['status'], result))
else:
return {'cert': result, 'uri': info['location']}
- def _der_to_pem(self,der_cert):
+ def _der_to_pem(self, der_cert):
'''
Convert the DER format certificate in der_cert to a PEM format
certificate and return it.
@@ -759,47 +764,46 @@ class ACMEClient(object):
cert = self._new_cert()
if cert['cert'] is not None:
pem_cert = self._der_to_pem(cert['cert'])
- if write_file(self.module,self.dest,pem_cert):
- self.cert_days = get_cert_days(self.module,self.dest)
+ if write_file(self.module, self.dest, pem_cert):
+ self.cert_days = get_cert_days(self.module, self.dest)
self.changed = True
+
def main():
module = AnsibleModule(
- argument_spec = dict(
- account_key = dict(required=True, type='path'),
- account_email = dict(required=False, default=None, type='str'),
- acme_directory = dict(required=False, default='https://acme-staging.api.letsencrypt.org/directory', type='str'),
- agreement = dict(required=False, default='https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf', type='str'),
- challenge = dict(required=False, default='http-01', choices=['http-01', 'dns-01', 'tls-sni-02'], type='str'),
- csr = dict(required=True, aliases=['src'], type='path'),
- data = dict(required=False, no_log=True, default=None, type='dict'),
- dest = dict(required=True, aliases=['cert'], type='path'),
- remaining_days = dict(required=False, default=10, type='int'),
+ argument_spec=dict(
+ account_key=dict(required=True, type='path'),
+ account_email=dict(required=False, default=None, type='str'),
+ acme_directory=dict(required=False, default='https://acme-staging.api.letsencrypt.org/directory', type='str'),
+ agreement=dict(required=False, default='https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf', type='str'),
+ challenge=dict(required=False, default='http-01', choices=['http-01', 'dns-01', 'tls-sni-02'], type='str'),
+ csr=dict(required=True, aliases=['src'], type='path'),
+ data=dict(required=False, no_log=True, default=None, type='dict'),
+ dest=dict(required=True, aliases=['cert'], type='path'),
+ remaining_days=dict(required=False, default=10, type='int'),
),
- supports_check_mode = True,
+ supports_check_mode=True,
)
# AnsibleModule() changes the locale, so change it back to C because we rely on time.strptime() when parsing certificate dates.
locale.setlocale(locale.LC_ALL, "C")
- cert_days = get_cert_days(module,module.params['dest'])
+ cert_days = get_cert_days(module, module.params['dest'])
if cert_days < module.params['remaining_days']:
# If checkmode is active, base the changed state solely on the status
# of the certificate file as all other actions (accessing an account, checking
# the authorization status...) would lead to potential changes of the current
# state
if module.check_mode:
- module.exit_json(changed=True,authorizations={},
- challenge_data={},cert_days=cert_days)
+ module.exit_json(changed=True, authorizations={}, challenge_data={}, cert_days=cert_days)
else:
client = ACMEClient(module)
client.cert_days = cert_days
data = client.do_challenges()
client.get_certificate()
- module.exit_json(changed=client.changed,authorizations=client.authorizations,
- challenge_data=data,cert_days=client.cert_days)
+ module.exit_json(changed=client.changed, authorizations=client.authorizations, challenge_data=data, cert_days=client.cert_days)
else:
- module.exit_json(changed=False,cert_days=cert_days)
+ module.exit_json(changed=False, cert_days=cert_days)
# import module snippets
from ansible.module_utils.basic import *
diff --git a/test/sanity/pep8/legacy-files.txt b/test/sanity/pep8/legacy-files.txt
index ef254a5763..3aee7d60b1 100644
--- a/test/sanity/pep8/legacy-files.txt
+++ b/test/sanity/pep8/legacy-files.txt
@@ -772,7 +772,6 @@ lib/ansible/modules/web_infrastructure/ejabberd_user.py
lib/ansible/modules/web_infrastructure/htpasswd.py
lib/ansible/modules/web_infrastructure/jboss.py
lib/ansible/modules/web_infrastructure/jira.py
-lib/ansible/modules/web_infrastructure/letsencrypt.py
lib/ansible/modules/windows/win_disk_image.py
lib/ansible/modules/windows/win_dns_client.py
lib/ansible/modules/windows/win_domain.py