summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAllen Sanabria <asanabria@linuxdynasty.org>2016-07-14 16:37:06 -0700
committerAllen Sanabria <asanabria@linuxdynasty.org>2016-07-14 16:57:47 -0700
commite9f7fb092754e6a2aa1c540bc39d963594b141dd (patch)
tree2e59382d8cc9efc530e46fb1033cadef2acb15e5
parent8a17506058dc322f5a85b6ff60fa3e5b86e088de (diff)
downloadansible-modules-extras-e9f7fb092754e6a2aa1c540bc39d963594b141dd.tar.gz
Now when number of shards is different than what is the stream currently, it will fail.\n\nShards can not be changed on an already created stream
-rw-r--r--cloud/amazon/kinesis_stream.py79
1 files changed, 48 insertions, 31 deletions
diff --git a/cloud/amazon/kinesis_stream.py b/cloud/amazon/kinesis_stream.py
index a9139053..1ba25e69 100644
--- a/cloud/amazon/kinesis_stream.py
+++ b/cloud/amazon/kinesis_stream.py
@@ -147,7 +147,6 @@ tags:
'''
try:
- import boto
import botocore
import boto3
HAS_BOTO3 = True
@@ -285,20 +284,18 @@ def get_tags(client, stream_name, check_mode=False):
},
]
success = True
- except botocore.exceptions.ClientError, e:
+ except botocore.exceptions.ClientError as e:
err_msg = str(e)
return success, err_msg, results
-def find_stream(client, stream_name, limit=1, check_mode=False):
+def find_stream(client, stream_name, check_mode=False):
"""Retrieve a Kinesis Stream.
Args:
client (botocore.client.EC2): Boto3 client.
stream_name (str): Name of the Kinesis stream.
Kwargs:
- limit (int): Limit the number of shards to return within a stream.
- default=1
check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
default=False
@@ -313,15 +310,20 @@ def find_stream(client, stream_name, limit=1, check_mode=False):
success = False
params = {
'StreamName': stream_name,
- 'Limit': limit
}
results = dict()
+ has_more_shards = True
+ shards = list()
try:
if not check_mode:
- results = (
- client.describe_stream(**params)['StreamDescription']
- )
- results.pop('Shards')
+ while has_more_shards:
+ results = (
+ client.describe_stream(**params)['StreamDescription']
+ )
+ shards.extend(results.pop('Shards'))
+ has_more_shards = results['HasMoreShards']
+ results['Shards'] = shards
+ results['ShardsCount'] = len(shards)
else:
results = {
'HasMoreShards': True,
@@ -331,7 +333,7 @@ def find_stream(client, stream_name, limit=1, check_mode=False):
'StreamStatus': 'ACTIVE'
}
success = True
- except botocore.exceptions.ClientError, e:
+ except botocore.exceptions.ClientError as e:
err_msg = str(e)
return success, err_msg, results
@@ -391,6 +393,8 @@ def wait_for_status(client, stream_name, status, wait_timeout=300,
if not status_achieved:
err_msg = "Wait time out reached, while waiting for results"
+ else:
+ err_msg = "Status {0} achieved successfully".format(status)
return status_achieved, err_msg, stream
@@ -442,7 +446,7 @@ def tags_action(client, stream_name, tags, action='create', check_mode=False):
else:
err_msg = 'Invalid action {0}'.format(action)
- except botocore.exceptions.ClientError, e:
+ except botocore.exceptions.ClientError as e:
err_msg = str(e)
return success, err_msg
@@ -500,6 +504,7 @@ def update_tags(client, stream_name, tags, check_mode=False):
Tuple (bool, str)
"""
success = False
+ changed = False
err_msg = ''
tag_success, tag_msg, current_tags = (
get_tags(client, stream_name, check_mode=check_mode)
@@ -536,13 +541,13 @@ def update_tags(client, stream_name, tags, check_mode=False):
)
)
if not delete_success:
- return delete_success, delete_msg
+ return delete_success, changed, delete_msg
if tags_to_update:
tags = make_tags_in_proper_format(
recreate_tags_from_list(tags_to_update)
)
else:
- return True, 'Tags do not need to be updated'
+ return True, changed, 'Tags do not need to be updated'
if tags:
create_success, create_msg = (
@@ -551,9 +556,11 @@ def update_tags(client, stream_name, tags, check_mode=False):
check_mode=check_mode
)
)
- return create_success, create_msg
+ if create_success:
+ changed = True
+ return create_success, changed, create_msg
- return success, err_msg
+ return success, changed, err_msg
def stream_action(client, stream_name, shard_count=1, action='create',
timeout=300, check_mode=False):
@@ -603,7 +610,7 @@ def stream_action(client, stream_name, shard_count=1, action='create',
else:
err_msg = 'Invalid action {0}'.format(action)
- except botocore.exceptions.ClientError, e:
+ except botocore.exceptions.ClientError as e:
err_msg = str(e)
return success, err_msg
@@ -644,10 +651,18 @@ def retention_action(client, stream_name, retention_period=24,
params['RetentionPeriodHours'] = retention_period
client.increase_stream_retention_period(**params)
success = True
+ err_msg = (
+ 'Retention Period increased successfully to {0}'
+ .format(retention_period)
+ )
elif action == 'decrease':
params['RetentionPeriodHours'] = retention_period
client.decrease_stream_retention_period(**params)
success = True
+ err_msg = (
+ 'Retention Period decreased successfully to {0}'
+ .format(retention_period)
+ )
else:
err_msg = 'Invalid action {0}'.format(action)
else:
@@ -658,7 +673,7 @@ def retention_action(client, stream_name, retention_period=24,
else:
err_msg = 'Invalid action {0}'.format(action)
- except botocore.exceptions.ClientError, e:
+ except botocore.exceptions.ClientError as e:
err_msg = str(e)
return success, err_msg
@@ -698,7 +713,7 @@ def update(client, current_stream, stream_name, retention_period=None,
Returns:
Tuple (bool, bool, str)
"""
- success = False
+ success = True
changed = False
err_msg = ''
if retention_period:
@@ -710,9 +725,10 @@ def update(client, current_stream, stream_name, retention_period=None,
)
)
if not wait_success:
- return wait_success, True, wait_msg
+ return wait_success, False, wait_msg
if current_stream['StreamStatus'] == 'ACTIVE':
+ retention_changed = False
if retention_period > current_stream['RetentionPeriodHours']:
retention_changed, retention_msg = (
retention_action(
@@ -720,8 +736,6 @@ def update(client, current_stream, stream_name, retention_period=None,
check_mode=check_mode
)
)
- if retention_changed:
- success = True
elif retention_period < current_stream['RetentionPeriodHours']:
retention_changed, retention_msg = (
@@ -730,11 +744,8 @@ def update(client, current_stream, stream_name, retention_period=None,
check_mode=check_mode
)
)
- if retention_changed:
- success = True
elif retention_period == current_stream['RetentionPeriodHours']:
- retention_changed = False
retention_msg = (
'Retention {0} is the same as {1}'
.format(
@@ -744,7 +755,10 @@ def update(client, current_stream, stream_name, retention_period=None,
)
success = True
- changed = retention_changed
+ if retention_changed:
+ success = True
+ changed = True
+
err_msg = retention_msg
if changed and wait:
wait_success, wait_msg, current_stream = (
@@ -754,7 +768,7 @@ def update(client, current_stream, stream_name, retention_period=None,
)
)
if not wait_success:
- return wait_success, True, wait_msg
+ return wait_success, False, wait_msg
elif changed and not wait:
stream_found, stream_msg, current_stream = (
find_stream(client, stream_name, check_mode=check_mode)
@@ -774,11 +788,9 @@ def update(client, current_stream, stream_name, retention_period=None,
return success, changed, err_msg
if tags:
- changed, err_msg = (
+ _, _, err_msg = (
update_tags(client, stream_name, tags, check_mode=check_mode)
)
- if changed:
- success = True
if wait:
success, err_msg, _ = (
wait_for_status(
@@ -832,6 +844,11 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None
stream_found, stream_msg, current_stream = (
find_stream(client, stream_name, check_mode=check_mode)
)
+ if stream_found:
+ if current_stream['ShardsCount'] != number_of_shards:
+ err_msg = 'Can not change the number of shards in a Kinesis Stream'
+ return success, changed, err_msg, results
+
if stream_found and current_stream['StreamStatus'] == 'DELETING' and wait:
wait_success, wait_msg, current_stream = (
wait_for_status(
@@ -1031,7 +1048,7 @@ def main():
region=region, endpoint=ec2_url, **aws_connect_kwargs
)
)
- except botocore.exceptions.ClientError, e:
+ except botocore.exceptions.ClientError as e:
err_msg = 'Boto3 Client Error - {0}'.format(str(e.msg))
module.fail_json(
success=False, changed=False, result={}, msg=err_msg