summaryrefslogtreecommitdiff
path: root/tools/aws_config_quick_start/certs.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/aws_config_quick_start/certs.py')
-rw-r--r--tools/aws_config_quick_start/certs.py88
1 files changed, 88 insertions, 0 deletions
diff --git a/tools/aws_config_quick_start/certs.py b/tools/aws_config_quick_start/certs.py
new file mode 100644
index 000000000..1e29f4d93
--- /dev/null
+++ b/tools/aws_config_quick_start/certs.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+
+import boto3
+import json
+
+
+class Certificate():
+
+ def __init__(self, certId=''):
+ self.id = certId
+ self.arn = ''
+ self.client = boto3.client('iot')
+ if (self.id != ''):
+ result = self.client.describe_certificate(certificateId=self.id)
+ self.arn = result['certificateDescription']['certificateArn']
+
+ def create(self):
+ assert not self.exists(), "Cert already exists"
+ cert = self.create_keys_and_certificate()
+ self.id = cert["certificateId"]
+ self.arn = cert["certificateArn"]
+ return cert
+
+ def create_keys_and_certificate(self):
+ result = self.client.create_keys_and_certificate(setAsActive=True)
+ return result
+
+ def delete(self):
+ cert_not_found = True
+ # Detach Policies attached to the cert
+ policies_attached = self.list_policies()
+ for policy in policies_attached:
+ self.detach_policy(policy['policyName'])
+
+ # Detach Things attached to the cert
+ things_attached = self.list_things()
+ for thing in things_attached:
+ self.detach_thing(thing)
+
+ # Update the status of the certificate to INACTIVE
+ try:
+ self.client.update_certificate(certificateId=self.id,
+ newStatus='INACTIVE')
+ cert_not_found = False
+ except self.client.exceptions.ResourceNotFoundException:
+ cert_not_found = True
+ return cert_not_found
+
+ # Delete the certificate
+ try:
+ self.client.delete_certificate(certificateId=self.id)
+ cert_not_found = False
+ except self.client.exceptions.ResourceNotFoundException:
+ cert_not_found = True
+ return cert_not_found
+
+ def exists(self):
+ if self.id == '':
+ return False
+ else:
+ return True
+
+ def get_arn(self):
+ return self.arn
+
+ def list_policies(self):
+ policies = self.client.list_principal_policies(principal=self.arn)
+ policies = policies['policies']
+ return policies
+
+ def attach_policy(self, policy_name):
+ self.client.attach_policy(policyName=policy_name, target=self.arn)
+
+ def detach_policy(self, policy_name):
+ self.client.detach_policy(policyName=policy_name, target=self.arn)
+
+ def list_things(self):
+ things = self.client.list_principal_things(principal=self.arn)
+ things = things['things']
+ return things
+
+ def attach_thing(self, thing_name):
+ self.client.attach_thing_principal(thingName=thing_name,
+ principal=self.arn)
+
+ def detach_thing(self, thing_name):
+ self.client.detach_thing_principal(thingName=thing_name,
+ principal=self.arn)