summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloud/amazon/kinesis_stream.py1083
-rw-r--r--test/unit/cloud/amazon/test_kinesis_stream.py285
2 files changed, 1368 insertions, 0 deletions
diff --git a/cloud/amazon/kinesis_stream.py b/cloud/amazon/kinesis_stream.py
new file mode 100644
index 00000000..1ba25e69
--- /dev/null
+++ b/cloud/amazon/kinesis_stream.py
@@ -0,0 +1,1083 @@
+#!/usr/bin/python
+#
+# This is a free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This Ansible library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+DOCUMENTATION = '''
+---
+module: kinesis_stream
+short_description: Manage a Kinesis Stream.
+description:
+ - Create or Delete a Kinesis Stream.
+ - Update the retention period of a Kinesis Stream.
+ - Update Tags on a Kinesis Stream.
+version_added: "2.2"
+author: Allen Sanabria (@linuxdynasty)
+options:
+ name:
+ description:
+ - "The name of the Kinesis Stream you are managing."
+ default: None
+ required: true
+ shards:
+ description:
+ - "The number of shards you want to have with this stream. This can not
+ be modified after being created."
+ - "This is required when state == present"
+ required: false
+ default: None
+ retention_period:
+ description:
+ - "The default retention period is 24 hours and can not be less than 24
+ hours."
+ - "The retention period can be modified during any point in time."
+ required: false
+ default: None
+ state:
+ description:
+ - "Create or Delete the Kinesis Stream."
+ required: false
+ default: present
+ choices: [ 'present', 'absent' ]
+ wait:
+ description:
+ - Wait for operation to complete before returning
+ required: false
+ default: true
+ wait_timeout:
+ description:
+ - How many seconds to wait for an operation to complete before timing out
+ required: false
+ default: 300
+ tags:
+ description:
+ - "A dictionary of resource tags of the form: { tag1: value1, tag2: value2 }."
+ required: false
+ default: null
+ aliases: [ "resource_tags" ]
+extends_documentation_fragment:
+ - aws
+ - ec2
+'''
+
+EXAMPLES = '''
+# Note: These examples do not set authentication details, see the AWS Guide for details.
+
+# Basic creation example:
+- name: Set up Kinesis Stream with 10 shards and wait for the stream to become ACTIVE
+ kinesis_stream:
+ name: test-stream
+ shards: 10
+ wait: yes
+ wait_timeout: 600
+ register: test_stream
+
+# Basic creation example with tags:
+- name: Set up Kinesis Stream with 10 shards, tag the environment, and wait for the stream to become ACTIVE
+ kinesis_stream:
+ name: test-stream
+ shards: 10
+ tags:
+ Env: development
+ wait: yes
+ wait_timeout: 600
+ register: test_stream
+
+# Basic creation example with tags and increase the retention period from the default 24 hours to 48 hours:
+- name: Set up Kinesis Stream with 10 shards, tag the environment, increase the retention period and wait for the stream to become ACTIVE
+ kinesis_stream:
+ name: test-stream
+ retention_period: 48
+ shards: 10
+ tags:
+ Env: development
+ wait: yes
+ wait_timeout: 600
+ register: test_stream
+
+# Basic delete example:
+- name: Delete Kinesis Stream test-stream and wait for it to finish deleting.
+ kinesis_stream:
+ name: test-stream
+ state: absent
+ wait: yes
+ wait_timeout: 600
+ register: test_stream
+'''
+
+RETURN = '''
+stream_name:
+ description: The name of the Kinesis Stream.
+ returned: when state == present.
+ type: string
+ sample: "test-stream"
+stream_arn:
+ description: The amazon resource identifier
+ returned: when state == present.
+ type: string
+ sample: "arn:aws:kinesis:east-side:123456789:stream/test-stream"
+stream_status:
+ description: The current state of the Kinesis Stream.
+ returned: when state == present.
+ type: string
+ sample: "ACTIVE"
+retention_period_hours:
+ description: Number of hours messages will be kept for a Kinesis Stream.
+ returned: when state == present.
+ type: int
+ sample: 24
+tags:
+ description: Dictionary containing all the tags associated with the Kinesis stream.
+ returned: when state == present.
+ type: dict
+ sample: {
+ "Name": "Splunk",
+ "Env": "development"
+ }
+'''
+
+try:
+ import botocore
+ import boto3
+ HAS_BOTO3 = True
+except ImportError:
+ HAS_BOTO3 = False
+
+import re
+import datetime
+import time
+from functools import reduce
+
+def convert_to_lower(data):
+ """Convert all uppercase keys in dict with lowercase_
+ Args:
+ data (dict): Dictionary with keys that have upper cases in them
+ Example.. FooBar == foo_bar
+ if a val is of type datetime.datetime, it will be converted to
+ the ISO 8601
+
+ Basic Usage:
+ >>> test = {'FooBar': []}
+ >>> test = convert_to_lower(test)
+ {
+ 'foo_bar': []
+ }
+
+ Returns:
+ Dictionary
+ """
+ results = dict()
+ if isinstance(data, dict):
+ for key, val in data.items():
+ key = re.sub(r'(([A-Z]{1,3}){1})', r'_\1', key).lower()
+ if key[0] == '_':
+ key = key[1:]
+ if isinstance(val, datetime.datetime):
+ results[key] = val.isoformat()
+ elif isinstance(val, dict):
+ results[key] = convert_to_lower(val)
+ elif isinstance(val, list):
+ converted = list()
+ for item in val:
+ converted.append(convert_to_lower(item))
+ results[key] = converted
+ else:
+ results[key] = val
+ return results
+
+def make_tags_in_proper_format(tags):
+ """Take a dictionary of tags and convert them into the AWS Tags format.
+ Args:
+ tags (list): The tags you want applied.
+
+ Basic Usage:
+ >>> tags = [{'Key': 'env', 'Value': 'development'}]
+ >>> make_tags_in_proper_format(tags)
+ {
+ "env": "development",
+ }
+
+ Returns:
+ Dict
+ """
+ formatted_tags = dict()
+ for tag in tags:
+ formatted_tags[tag.get('Key')] = tag.get('Value')
+
+ return formatted_tags
+
+def make_tags_in_aws_format(tags):
+ """Take a dictionary of tags and convert them into the AWS Tags format.
+ Args:
+ tags (dict): The tags you want applied.
+
+ Basic Usage:
+ >>> tags = {'env': 'development', 'service': 'web'}
+ >>> make_tags_in_proper_format(tags)
+ [
+ {
+ "Value": "web",
+ "Key": "service"
+ },
+ {
+ "Value": "development",
+ "key": "env"
+ }
+ ]
+
+ Returns:
+ List
+ """
+ formatted_tags = list()
+ for key, val in tags.items():
+ formatted_tags.append({
+ 'Key': key,
+ 'Value': val
+ })
+
+ return formatted_tags
+
+def get_tags(client, stream_name, check_mode=False):
+ """Retrieve the tags for a Kinesis Stream.
+ Args:
+ client (botocore.client.EC2): Boto3 client.
+ stream_name (str): Name of the Kinesis stream.
+
+ Kwargs:
+ check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
+ default=False
+
+ Basic Usage:
+ >>> client = boto3.client('kinesis')
+ >>> stream_name = 'test-stream'
+ >> get_tags(client, stream_name)
+
+ Returns:
+ Tuple (bool, str, dict)
+ """
+ err_msg = ''
+ success = False
+ params = {
+ 'StreamName': stream_name,
+ }
+ results = dict()
+ try:
+ if not check_mode:
+ results = (
+ client.list_tags_for_stream(**params)['Tags']
+ )
+ else:
+ results = [
+ {
+ 'Key': 'DryRunMode',
+ 'Value': 'true'
+ },
+ ]
+ success = True
+ except botocore.exceptions.ClientError as e:
+ err_msg = str(e)
+
+ return success, err_msg, results
+
+def find_stream(client, stream_name, check_mode=False):
+ """Retrieve a Kinesis Stream.
+ Args:
+ client (botocore.client.EC2): Boto3 client.
+ stream_name (str): Name of the Kinesis stream.
+
+ Kwargs:
+ check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
+ default=False
+
+ Basic Usage:
+ >>> client = boto3.client('kinesis')
+ >>> stream_name = 'test-stream'
+
+ Returns:
+ Tuple (bool, str, dict)
+ """
+ err_msg = ''
+ success = False
+ params = {
+ 'StreamName': stream_name,
+ }
+ results = dict()
+ has_more_shards = True
+ shards = list()
+ try:
+ if not check_mode:
+ while has_more_shards:
+ results = (
+ client.describe_stream(**params)['StreamDescription']
+ )
+ shards.extend(results.pop('Shards'))
+ has_more_shards = results['HasMoreShards']
+ results['Shards'] = shards
+ results['ShardsCount'] = len(shards)
+ else:
+ results = {
+ 'HasMoreShards': True,
+ 'RetentionPeriodHours': 24,
+ 'StreamName': stream_name,
+ 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/{0}'.format(stream_name),
+ 'StreamStatus': 'ACTIVE'
+ }
+ success = True
+ except botocore.exceptions.ClientError as e:
+ err_msg = str(e)
+
+ return success, err_msg, results
+
+def wait_for_status(client, stream_name, status, wait_timeout=300,
+ check_mode=False):
+ """Wait for the the status to change for a Kinesis Stream.
+ Args:
+ client (botocore.client.EC2): Boto3 client
+ stream_name (str): The name of the kinesis stream.
+ status (str): The status to wait for.
+ examples. status=available, status=deleted
+
+ Kwargs:
+ wait_timeout (int): Number of seconds to wait, until this timeout is reached.
+ check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
+ default=False
+
+ Basic Usage:
+ >>> client = boto3.client('kinesis')
+ >>> stream_name = 'test-stream'
+ >>> wait_for_status(client, stream_name, 'ACTIVE', 300)
+
+ Returns:
+ Tuple (bool, str, dict)
+ """
+ polling_increment_secs = 5
+ wait_timeout = time.time() + wait_timeout
+ status_achieved = False
+ stream = dict()
+ err_msg = ""
+
+ while wait_timeout > time.time():
+ try:
+ find_success, find_msg, stream = (
+ find_stream(client, stream_name, check_mode=check_mode)
+ )
+ if check_mode:
+ status_achieved = True
+ break
+
+ elif status != 'DELETING':
+ if find_success and stream:
+ if stream.get('StreamStatus') == status:
+ status_achieved = True
+ break
+
+ elif status == 'DELETING' and not check_mode:
+ if not find_success:
+ status_achieved = True
+ break
+
+ else:
+ time.sleep(polling_increment_secs)
+ except botocore.exceptions.ClientError as e:
+ err_msg = str(e)
+
+ if not status_achieved:
+ err_msg = "Wait time out reached, while waiting for results"
+ else:
+ err_msg = "Status {0} achieved successfully".format(status)
+
+ return status_achieved, err_msg, stream
+
+def tags_action(client, stream_name, tags, action='create', check_mode=False):
+ """Create or delete multiple tags from a Kinesis Stream.
+ Args:
+ client (botocore.client.EC2): Boto3 client.
+ resource_id (str): The Amazon resource id.
+ tags (list): List of dictionaries.
+ examples.. [{Name: "", Values: [""]}]
+
+ Kwargs:
+ action (str): The action to perform.
+ valid actions == create and delete
+ default=create
+ check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
+ default=False
+
+ Basic Usage:
+ >>> client = boto3.client('ec2')
+ >>> resource_id = 'pcx-123345678'
+ >>> tags = {'env': 'development'}
+ >>> update_tags(client, resource_id, tags)
+ [True, '']
+
+ Returns:
+ List (bool, str)
+ """
+ success = False
+ err_msg = ""
+ params = {'StreamName': stream_name}
+ try:
+ if not check_mode:
+ if action == 'create':
+ params['Tags'] = tags
+ client.add_tags_to_stream(**params)
+ success = True
+ elif action == 'delete':
+ params['TagKeys'] = tags.keys()
+ client.remove_tags_from_stream(**params)
+ success = True
+ else:
+ err_msg = 'Invalid action {0}'.format(action)
+ else:
+ if action == 'create':
+ success = True
+ elif action == 'delete':
+ success = True
+ else:
+ err_msg = 'Invalid action {0}'.format(action)
+
+ except botocore.exceptions.ClientError as e:
+ err_msg = str(e)
+
+ return success, err_msg
+
+def recreate_tags_from_list(list_of_tags):
+ """Recreate tags from a list of tuples into the Amazon Tag format.
+ Args:
+ list_of_tags (list): List of tuples.
+
+ Basic Usage:
+ >>> list_of_tags = [('Env', 'Development')]
+ >>> recreate_tags_from_list(list_of_tags)
+ [
+ {
+ "Value": "Development",
+ "Key": "Env"
+ }
+ ]
+
+ Returns:
+ List
+ """
+ tags = list()
+ i = 0
+ list_of_tags = list_of_tags
+ for i in range(len(list_of_tags)):
+ key_name = list_of_tags[i][0]
+ key_val = list_of_tags[i][1]
+ tags.append(
+ {
+ 'Key': key_name,
+ 'Value': key_val
+ }
+ )
+ return tags
+
+def update_tags(client, stream_name, tags, check_mode=False):
+ """Update tags for an amazon resource.
+ Args:
+ resource_id (str): The Amazon resource id.
+ tags (dict): Dictionary of tags you want applied to the Kinesis stream.
+
+ Kwargs:
+ check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
+ default=False
+
+ Basic Usage:
+ >>> client = boto3.client('ec2')
+ >>> stream_name = 'test-stream'
+ >>> tags = {'env': 'development'}
+ >>> update_tags(client, stream_name, tags)
+ [True, '']
+
+ Return:
+ Tuple (bool, str)
+ """
+ success = False
+ changed = False
+ err_msg = ''
+ tag_success, tag_msg, current_tags = (
+ get_tags(client, stream_name, check_mode=check_mode)
+ )
+ if current_tags:
+ tags = make_tags_in_aws_format(tags)
+ current_tags_set = (
+ set(
+ reduce(
+ lambda x, y: x + y,
+ [make_tags_in_proper_format(current_tags).items()]
+ )
+ )
+ )
+
+ new_tags_set = (
+ set(
+ reduce(
+ lambda x, y: x + y,
+ [make_tags_in_proper_format(tags).items()]
+ )
+ )
+ )
+ tags_to_delete = list(current_tags_set.difference(new_tags_set))
+ tags_to_update = list(new_tags_set.difference(current_tags_set))
+ if tags_to_delete:
+ tags_to_delete = make_tags_in_proper_format(
+ recreate_tags_from_list(tags_to_delete)
+ )
+ delete_success, delete_msg = (
+ tags_action(
+ client, stream_name, tags_to_delete, action='delete',
+ check_mode=check_mode
+ )
+ )
+ if not delete_success:
+ return delete_success, changed, delete_msg
+ if tags_to_update:
+ tags = make_tags_in_proper_format(
+ recreate_tags_from_list(tags_to_update)
+ )
+ else:
+ return True, changed, 'Tags do not need to be updated'
+
+ if tags:
+ create_success, create_msg = (
+ tags_action(
+ client, stream_name, tags, action='create',
+ check_mode=check_mode
+ )
+ )
+ if create_success:
+ changed = True
+ return create_success, changed, create_msg
+
+ return success, changed, err_msg
+
+def stream_action(client, stream_name, shard_count=1, action='create',
+ timeout=300, check_mode=False):
+ """Create or Delete an Amazon Kinesis Stream.
+ Args:
+ client (botocore.client.EC2): Boto3 client.
+ stream_name (str): The name of the kinesis stream.
+
+ Kwargs:
+ shard_count (int): Number of shards this stream will use.
+ action (str): The action to perform.
+ valid actions == create and delete
+ default=create
+ check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
+ default=False
+
+ Basic Usage:
+ >>> client = boto3.client('kinesis')
+ >>> stream_name = 'test-stream'
+ >>> shard_count = 20
+ >>> stream_action(client, stream_name, shard_count, action='create')
+
+ Returns:
+ List (bool, str)
+ """
+ success = False
+ err_msg = ''
+ params = {
+ 'StreamName': stream_name
+ }
+ try:
+ if not check_mode:
+ if action == 'create':
+ params['ShardCount'] = shard_count
+ client.create_stream(**params)
+ success = True
+ elif action == 'delete':
+ client.delete_stream(**params)
+ success = True
+ else:
+ err_msg = 'Invalid action {0}'.format(action)
+ else:
+ if action == 'create':
+ success = True
+ elif action == 'delete':
+ success = True
+ else:
+ err_msg = 'Invalid action {0}'.format(action)
+
+ except botocore.exceptions.ClientError as e:
+ err_msg = str(e)
+
+ return success, err_msg
+
+def retention_action(client, stream_name, retention_period=24,
+ action='increase', check_mode=False):
+ """Increase or Decreaste the retention of messages in the Kinesis stream.
+ Args:
+ client (botocore.client.EC2): Boto3 client.
+ stream_name (str): The
+
+ Kwargs:
+ retention_period (int): This is how long messages will be kept before
+ they are discarded. This can not be less than 24 hours.
+ action (str): The action to perform.
+ valid actions == create and delete
+ default=create
+ check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
+ default=False
+
+ Basic Usage:
+ >>> client = boto3.client('kinesis')
+ >>> stream_name = 'test-stream'
+ >>> retention_period = 48
+ >>> stream_action(client, stream_name, retention_period, action='create')
+
+ Returns:
+ Tuple (bool, str)
+ """
+ success = False
+ err_msg = ''
+ params = {
+ 'StreamName': stream_name
+ }
+ try:
+ if not check_mode:
+ if action == 'increase':
+ params['RetentionPeriodHours'] = retention_period
+ client.increase_stream_retention_period(**params)
+ success = True
+ err_msg = (
+ 'Retention Period increased successfully to {0}'
+ .format(retention_period)
+ )
+ elif action == 'decrease':
+ params['RetentionPeriodHours'] = retention_period
+ client.decrease_stream_retention_period(**params)
+ success = True
+ err_msg = (
+ 'Retention Period decreased successfully to {0}'
+ .format(retention_period)
+ )
+ else:
+ err_msg = 'Invalid action {0}'.format(action)
+ else:
+ if action == 'increase':
+ success = True
+ elif action == 'decrease':
+ success = True
+ else:
+ err_msg = 'Invalid action {0}'.format(action)
+
+ except botocore.exceptions.ClientError as e:
+ err_msg = str(e)
+
+ return success, err_msg
+
+def update(client, current_stream, stream_name, retention_period=None,
+ tags=None, wait=False, wait_timeout=300, check_mode=False):
+ """Update an Amazon Kinesis Stream.
+ Args:
+ client (botocore.client.EC2): Boto3 client.
+ stream_name (str): The name of the kinesis stream.
+
+ Kwargs:
+ retention_period (int): This is how long messages will be kept before
+ they are discarded. This can not be less than 24 hours.
+ tags (dict): The tags you want applied.
+ wait (bool): Wait until Stream is ACTIVE.
+ default=False
+ wait_timeout (int): How long to wait until this operation is considered failed.
+ default=300
+ check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
+ default=False
+
+ Basic Usage:
+ >>> client = boto3.client('kinesis')
+ >>> current_stream = {
+ 'HasMoreShards': True,
+ 'RetentionPeriodHours': 24,
+ 'StreamName': 'test-stream',
+ 'StreamARN': 'arn:aws:kinesis:us-west-2:123456789:stream/test-stream',
+ 'StreamStatus': "ACTIVE'
+ }
+ >>> stream_name = 'test-stream'
+ >>> retention_period = 48
+ >>> stream_action(client, current_stream, stream_name,
+ retention_period, action='create' )
+
+ Returns:
+ Tuple (bool, bool, str)
+ """
+ success = True
+ changed = False
+ err_msg = ''
+ if retention_period:
+ if wait:
+ wait_success, wait_msg, current_stream = (
+ wait_for_status(
+ client, stream_name, 'ACTIVE', wait_timeout,
+ check_mode=check_mode
+ )
+ )
+ if not wait_success:
+ return wait_success, False, wait_msg
+
+ if current_stream['StreamStatus'] == 'ACTIVE':
+ retention_changed = False
+ if retention_period > current_stream['RetentionPeriodHours']:
+ retention_changed, retention_msg = (
+ retention_action(
+ client, stream_name, retention_period, action='increase',
+ check_mode=check_mode
+ )
+ )
+
+ elif retention_period < current_stream['RetentionPeriodHours']:
+ retention_changed, retention_msg = (
+ retention_action(
+ client, stream_name, retention_period, action='decrease',
+ check_mode=check_mode
+ )
+ )
+
+ elif retention_period == current_stream['RetentionPeriodHours']:
+ retention_msg = (
+ 'Retention {0} is the same as {1}'
+ .format(
+ retention_period,
+ current_stream['RetentionPeriodHours']
+ )
+ )
+ success = True
+
+ if retention_changed:
+ success = True
+ changed = True
+
+ err_msg = retention_msg
+ if changed and wait:
+ wait_success, wait_msg, current_stream = (
+ wait_for_status(
+ client, stream_name, 'ACTIVE', wait_timeout,
+ check_mode=check_mode
+ )
+ )
+ if not wait_success:
+ return wait_success, False, wait_msg
+ elif changed and not wait:
+ stream_found, stream_msg, current_stream = (
+ find_stream(client, stream_name, check_mode=check_mode)
+ )
+ if stream_found:
+ if current_stream['StreamStatus'] != 'ACTIVE':
+ err_msg = (
+ 'Retention Period for {0} is in the process of updating'
+ .format(stream_name)
+ )
+ return success, changed, err_msg
+ else:
+ err_msg = (
+ 'StreamStatus has to be ACTIVE in order to modify the retention period. Current status is {0}'
+ .format(current_stream['StreamStatus'])
+ )
+ return success, changed, err_msg
+
+ if tags:
+ _, _, err_msg = (
+ update_tags(client, stream_name, tags, check_mode=check_mode)
+ )
+ if wait:
+ success, err_msg, _ = (
+ wait_for_status(
+ client, stream_name, 'ACTIVE', wait_timeout,
+ check_mode=check_mode
+ )
+ )
+ if success and changed:
+ err_msg = 'Kinesis Stream {0} updated successfully.'.format(stream_name)
+ elif success and not changed:
+ err_msg = 'Kinesis Stream {0} did not changed.'.format(stream_name)
+
+ return success, changed, err_msg
+
+def create_stream(client, stream_name, number_of_shards=1, retention_period=None,
+ tags=None, wait=False, wait_timeout=300, check_mode=False):
+ """Create an Amazon Kinesis Stream.
+ Args:
+ client (botocore.client.EC2): Boto3 client.
+ stream_name (str): The name of the kinesis stream.
+
+ Kwargs:
+ number_of_shards (int): Number of shards this stream will use.
+ default=1
+ retention_period (int): Can not be less than 24 hours
+ default=None
+ tags (dict): The tags you want applied.
+ default=None
+ wait (bool): Wait until Stream is ACTIVE.
+ default=False
+ wait_timeout (int): How long to wait until this operation is considered failed.
+ default=300
+ check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
+ default=False
+
+ Basic Usage:
+ >>> client = boto3.client('kinesis')
+ >>> stream_name = 'test-stream'
+ >>> number_of_shards = 10
+ >>> tags = {'env': 'test'}
+ >>> create_stream(client, stream_name, number_of_shards, tags=tags)
+
+ Returns:
+ Tuple (bool, bool, str, dict)
+ """
+ success = False
+ changed = False
+ err_msg = ''
+ results = dict()
+
+ stream_found, stream_msg, current_stream = (
+ find_stream(client, stream_name, check_mode=check_mode)
+ )
+ if stream_found:
+ if current_stream['ShardsCount'] != number_of_shards:
+ err_msg = 'Can not change the number of shards in a Kinesis Stream'
+ return success, changed, err_msg, results
+
+ if stream_found and current_stream['StreamStatus'] == 'DELETING' and wait:
+ wait_success, wait_msg, current_stream = (
+ wait_for_status(
+ client, stream_name, 'ACTIVE', wait_timeout,
+ check_mode=check_mode
+ )
+ )
+ if stream_found and current_stream['StreamStatus'] != 'DELETING':
+ success, changed, err_msg = update(
+ client, current_stream, stream_name, retention_period, tags,
+ wait, wait_timeout, check_mode=check_mode
+ )
+ else:
+ create_success, create_msg = (
+ stream_action(
+ client, stream_name, number_of_shards, action='create',
+ check_mode=check_mode
+ )
+ )
+ if create_success:
+ changed = True
+ if wait:
+ wait_success, wait_msg, results = (
+ wait_for_status(
+ client, stream_name, 'ACTIVE', wait_timeout,
+ check_mode=check_mode
+ )
+ )
+ err_msg = (
+ 'Kinesis Stream {0} is in the process of being created'
+ .format(stream_name)
+ )
+ if not wait_success:
+ return wait_success, True, wait_msg, results
+ else:
+ err_msg = (
+ 'Kinesis Stream {0} created successfully'
+ .format(stream_name)
+ )
+
+ if tags:
+ changed, err_msg = (
+ tags_action(
+ client, stream_name, tags, action='create',
+ check_mode=check_mode
+ )
+ )
+ if changed:
+ success = True
+ if not success:
+ return success, changed, err_msg, results
+
+ stream_found, stream_msg, current_stream = (
+ find_stream(client, stream_name, check_mode=check_mode)
+ )
+ if retention_period and current_stream['StreamStatus'] == 'ACTIVE':
+ changed, err_msg = (
+ retention_action(
+ client, stream_name, retention_period, action='increase',
+ check_mode=check_mode
+ )
+ )
+ if changed:
+ success = True
+ if not success:
+ return success, changed, err_msg, results
+ else:
+ err_msg = (
+ 'StreamStatus has to be ACTIVE in order to modify the retention period. Current status is {0}'
+ .format(current_stream['StreamStatus'])
+ )
+ success = create_success
+ changed = True
+
+ if success:
+ _, _, results = (
+ find_stream(client, stream_name, check_mode=check_mode)
+ )
+ _, _, current_tags = (
+ get_tags(client, stream_name, check_mode=check_mode)
+ )
+ if current_tags and not check_mode:
+ current_tags = make_tags_in_proper_format(current_tags)
+ results['Tags'] = current_tags
+ elif check_mode and tags:
+ results['Tags'] = tags
+ else:
+ results['Tags'] = dict()
+ results = convert_to_lower(results)
+
+ return success, changed, err_msg, results
+
+def delete_stream(client, stream_name, wait=False, wait_timeout=300,
+ check_mode=False):
+ """Delete an Amazon Kinesis Stream.
+ Args:
+ client (botocore.client.EC2): Boto3 client.
+ stream_name (str): The name of the kinesis stream.
+
+ Kwargs:
+ wait (bool): Wait until Stream is ACTIVE.
+ default=False
+ wait_timeout (int): How long to wait until this operation is considered failed.
+ default=300
+ check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
+ default=False
+
+ Basic Usage:
+ >>> client = boto3.client('kinesis')
+ >>> stream_name = 'test-stream'
+ >>> delete_stream(client, stream_name)
+
+ Returns:
+ Tuple (bool, bool, str, dict)
+ """
+ success = False
+ changed = False
+ err_msg = ''
+ results = dict()
+ stream_found, stream_msg, current_stream = (
+ find_stream(client, stream_name, check_mode=check_mode)
+ )
+ if stream_found:
+ success, err_msg = (
+ stream_action(
+ client, stream_name, action='delete', check_mode=check_mode
+ )
+ )
+ if success:
+ changed = True
+ if wait:
+ success, err_msg, results = (
+ wait_for_status(
+ client, stream_name, 'DELETING', wait_timeout,
+ check_mode=check_mode
+ )
+ )
+ err_msg = 'Stream {0} deleted successfully'.format(stream_name)
+ if not success:
+ return success, True, err_msg, results
+ else:
+ err_msg = (
+ 'Stream {0} is in the process of being deleted'
+ .format(stream_name)
+ )
+ else:
+ success = True
+ changed = False
+ err_msg = 'Stream {0} does not exist'.format(stream_name)
+
+ return success, changed, err_msg, results
+
+def main():
+ argument_spec = ec2_argument_spec()
+ argument_spec.update(
+ dict(
+ name = dict(default=None, required=True),
+ shards = dict(default=None, required=False, type='int'),
+ retention_period = dict(default=None, required=False, type='int'),
+ tags = dict(default=None, required=False, type='dict', aliases=['resource_tags']),
+ wait = dict(default=True, required=False, type='bool'),
+ wait_timeout = dict(default=300, required=False, type='int'),
+ state = dict(default='present', choices=['present', 'absent']),
+ )
+ )
+ module = AnsibleModule(
+ argument_spec=argument_spec,
+ supports_check_mode=True,
+ )
+
+ retention_period = module.params.get('retention_period')
+ stream_name = module.params.get('name')
+ shards = module.params.get('shards')
+ state = module.params.get('state')
+ tags = module.params.get('tags')
+ wait = module.params.get('wait')
+ wait_timeout = module.params.get('wait_timeout')
+
+ if state == 'present' and not shards:
+ module.fail_json(msg='Shards is required when state == present.')
+
+ if retention_period:
+ if retention_period < 24:
+ module.fail_json(msg='Retention period can not be less than 24 hours.')
+
+ if not HAS_BOTO3:
+ module.fail_json(msg='boto3 is required.')
+
+ check_mode = module.check_mode
+ try:
+ region, ec2_url, aws_connect_kwargs = (
+ get_aws_connection_info(module, boto3=True)
+ )
+ client = (
+ boto3_conn(
+ module, conn_type='client', resource='kinesis',
+ region=region, endpoint=ec2_url, **aws_connect_kwargs
+ )
+ )
+ except botocore.exceptions.ClientError as e:
+ err_msg = 'Boto3 Client Error - {0}'.format(str(e.msg))
+ module.fail_json(
+ success=False, changed=False, result={}, msg=err_msg
+ )
+
+ if state == 'present':
+ success, changed, err_msg, results = (
+ create_stream(
+ client, stream_name, shards, retention_period, tags,
+ wait, wait_timeout, check_mode
+ )
+ )
+ elif state == 'absent':
+ success, changed, err_msg, results = (
+ delete_stream(client, stream_name, wait, wait_timeout, check_mode)
+ )
+
+ if success:
+ module.exit_json(
+ success=success, changed=changed, msg=err_msg, **results
+ )
+ else:
+ module.fail_json(
+ success=success, changed=changed, msg=err_msg, result=results
+ )
+
+# import module snippets
+from ansible.module_utils.basic import *
+from ansible.module_utils.ec2 import *
+
+if __name__ == '__main__':
+ main()
diff --git a/test/unit/cloud/amazon/test_kinesis_stream.py b/test/unit/cloud/amazon/test_kinesis_stream.py
new file mode 100644
index 00000000..280ec5e2
--- /dev/null
+++ b/test/unit/cloud/amazon/test_kinesis_stream.py
@@ -0,0 +1,285 @@
+#!/usr/bin/python
+
+import boto3
+import unittest
+
+import cloud.amazon.kinesis_stream as kinesis_stream
+
+aws_region = 'us-west-2'
+
+
+class AnsibleKinesisStreamFunctions(unittest.TestCase):
+
+ def test_convert_to_lower(self):
+ example = {
+ 'HasMoreShards': True,
+ 'RetentionPeriodHours': 24,
+ 'StreamName': 'test',
+ 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
+ 'StreamStatus': 'ACTIVE'
+ }
+ converted_example = kinesis_stream.convert_to_lower(example)
+ keys = converted_example.keys()
+ keys.sort()
+ for i in range(len(keys)):
+ if i == 0:
+ self.assertEqual(keys[i], 'has_more_shards')
+ if i == 1:
+ self.assertEqual(keys[i], 'retention_period_hours')
+ if i == 2:
+ self.assertEqual(keys[i], 'stream_arn')
+ if i == 3:
+ self.assertEqual(keys[i], 'stream_name')
+ if i == 4:
+ self.assertEqual(keys[i], 'stream_status')
+
+ def test_make_tags_in_aws_format(self):
+ example = {
+ 'env': 'development'
+ }
+ should_return = [
+ {
+ 'Key': 'env',
+ 'Value': 'development'
+ }
+ ]
+ aws_tags = kinesis_stream.make_tags_in_aws_format(example)
+ self.assertEqual(aws_tags, should_return)
+
+ def test_make_tags_in_proper_format(self):
+ example = [
+ {
+ 'Key': 'env',
+ 'Value': 'development'
+ },
+ {
+ 'Key': 'service',
+ 'Value': 'web'
+ }
+ ]
+ should_return = {
+ 'env': 'development',
+ 'service': 'web'
+ }
+ proper_tags = kinesis_stream.make_tags_in_proper_format(example)
+ self.assertEqual(proper_tags, should_return)
+
+ def test_recreate_tags_from_list(self):
+ example = [('environment', 'development'), ('service', 'web')]
+ should_return = [
+ {
+ 'Key': 'environment',
+ 'Value': 'development'
+ },
+ {
+ 'Key': 'service',
+ 'Value': 'web'
+ }
+ ]
+ aws_tags = kinesis_stream.recreate_tags_from_list(example)
+ self.assertEqual(aws_tags, should_return)
+
+ def test_get_tags(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ success, err_msg, tags = kinesis_stream.get_tags(client, 'test', True)
+ self.assertTrue(success)
+ should_return = [
+ {
+ 'Key': 'DryRunMode',
+ 'Value': 'true'
+ }
+ ]
+ self.assertEqual(tags, should_return)
+
+ def test_find_stream(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ success, err_msg, stream = (
+ kinesis_stream.find_stream(client, 'test', check_mode=True)
+ )
+ should_return = {
+ 'HasMoreShards': True,
+ 'RetentionPeriodHours': 24,
+ 'StreamName': 'test',
+ 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
+ 'StreamStatus': 'ACTIVE'
+ }
+ self.assertTrue(success)
+ self.assertEqual(stream, should_return)
+
+ def test_wait_for_status(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ success, err_msg, stream = (
+ kinesis_stream.wait_for_status(
+ client, 'test', 'ACTIVE', check_mode=True
+ )
+ )
+ should_return = {
+ 'HasMoreShards': True,
+ 'RetentionPeriodHours': 24,
+ 'StreamName': 'test',
+ 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
+ 'StreamStatus': 'ACTIVE'
+ }
+ self.assertTrue(success)
+ self.assertEqual(stream, should_return)
+
+ def test_tags_action_create(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ tags = {
+ 'env': 'development',
+ 'service': 'web'
+ }
+ success, err_msg = (
+ kinesis_stream.tags_action(
+ client, 'test', tags, 'create', check_mode=True
+ )
+ )
+ self.assertTrue(success)
+
+ def test_tags_action_delete(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ tags = {
+ 'env': 'development',
+ 'service': 'web'
+ }
+ success, err_msg = (
+ kinesis_stream.tags_action(
+ client, 'test', tags, 'delete', check_mode=True
+ )
+ )
+ self.assertTrue(success)
+
+ def test_tags_action_invalid(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ tags = {
+ 'env': 'development',
+ 'service': 'web'
+ }
+ success, err_msg = (
+ kinesis_stream.tags_action(
+ client, 'test', tags, 'append', check_mode=True
+ )
+ )
+ self.assertFalse(success)
+
+ def test_update_tags(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ tags = {
+ 'env': 'development',
+ 'service': 'web'
+ }
+ success, err_msg = (
+ kinesis_stream.update_tags(
+ client, 'test', tags, check_mode=True
+ )
+ )
+ self.assertTrue(success)
+
+ def test_stream_action_create(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ success, err_msg = (
+ kinesis_stream.stream_action(
+ client, 'test', 10, 'create', check_mode=True
+ )
+ )
+ self.assertTrue(success)
+
+ def test_stream_action_delete(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ success, err_msg = (
+ kinesis_stream.stream_action(
+ client, 'test', 10, 'delete', check_mode=True
+ )
+ )
+ self.assertTrue(success)
+
+ def test_stream_action_invalid(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ success, err_msg = (
+ kinesis_stream.stream_action(
+ client, 'test', 10, 'append', check_mode=True
+ )
+ )
+ self.assertFalse(success)
+
+ def test_retention_action_increase(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ success, err_msg = (
+ kinesis_stream.retention_action(
+ client, 'test', 48, 'increase', check_mode=True
+ )
+ )
+ self.assertTrue(success)
+
+ def test_retention_action_decrease(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ success, err_msg = (
+ kinesis_stream.retention_action(
+ client, 'test', 24, 'decrease', check_mode=True
+ )
+ )
+ self.assertTrue(success)
+
+ def test_retention_action_invalid(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ success, err_msg = (
+ kinesis_stream.retention_action(
+ client, 'test', 24, 'create', check_mode=True
+ )
+ )
+ self.assertFalse(success)
+
+ def test_update(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ current_stream = {
+ 'HasMoreShards': True,
+ 'RetentionPeriodHours': 24,
+ 'StreamName': 'test',
+ 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
+ 'StreamStatus': 'ACTIVE'
+ }
+ tags = {
+ 'env': 'development',
+ 'service': 'web'
+ }
+ success, changed, err_msg = (
+ kinesis_stream.update(
+ client, current_stream, 'test', retention_period=48,
+ tags=tags, check_mode=True
+ )
+ )
+ self.assertTrue(success)
+ self.assertTrue(changed)
+ self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.')
+
+ def test_create_stream(self):
+ client = boto3.client('kinesis', region_name=aws_region)
+ tags = {
+ 'env': 'development',
+ 'service': 'web'
+ }
+ success, changed, err_msg, results = (
+ kinesis_stream.create_stream(
+ client, 'test', number_of_shards=10, retention_period=48,
+ tags=tags, check_mode=True
+ )
+ )
+ should_return = {
+ 'has_more_shards': True,
+ 'retention_period_hours': 24,
+ 'stream_name': 'test',
+ 'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test',
+ 'stream_status': 'ACTIVE',
+ 'tags': tags,
+ }
+ self.assertTrue(success)
+ self.assertTrue(changed)
+ self.assertEqual(results, should_return)
+ self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.')
+
+
+def main():
+ unittest.main()
+
+if __name__ == '__main__':
+ main()