summaryrefslogtreecommitdiff
path: root/test/units/modules/system
diff options
context:
space:
mode:
Diffstat (limited to 'test/units/modules/system')
-rw-r--r--test/units/modules/system/__init__.py0
-rw-r--r--test/units/modules/system/test_iptables.py875
-rw-r--r--test/units/modules/system/test_known_hosts.py107
-rw-r--r--test/units/modules/system/test_systemd.py50
4 files changed, 0 insertions, 1032 deletions
diff --git a/test/units/modules/system/__init__.py b/test/units/modules/system/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
--- a/test/units/modules/system/__init__.py
+++ /dev/null
diff --git a/test/units/modules/system/test_iptables.py b/test/units/modules/system/test_iptables.py
deleted file mode 100644
index 9cd5b2ed18..0000000000
--- a/test/units/modules/system/test_iptables.py
+++ /dev/null
@@ -1,875 +0,0 @@
-from units.compat.mock import patch
-from ansible.module_utils import basic
-from ansible.modules.system import iptables
-from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
-
-
-def get_bin_path(*args, **kwargs):
- return "/sbin/iptables"
-
-
-def get_iptables_version(iptables_path, module):
- return "1.8.2"
-
-
-class TestIptables(ModuleTestCase):
-
- def setUp(self):
- super(TestIptables, self).setUp()
- self.mock_get_bin_path = patch.object(basic.AnsibleModule, 'get_bin_path', get_bin_path)
- self.mock_get_bin_path.start()
- self.addCleanup(self.mock_get_bin_path.stop) # ensure that the patching is 'undone'
- self.mock_get_iptables_version = patch.object(iptables, 'get_iptables_version', get_iptables_version)
- self.mock_get_iptables_version.start()
- self.addCleanup(self.mock_get_iptables_version.stop) # ensure that the patching is 'undone'
-
- def test_without_required_parameters(self):
- """Failure must occurs when all parameters are missing"""
- with self.assertRaises(AnsibleFailJson):
- set_module_args({})
- iptables.main()
-
- def test_flush_table_without_chain(self):
- """Test flush without chain, flush the table"""
- set_module_args({
- 'flush': True,
- })
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.return_value = 0, '', '' # successful execution, no output
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args[0][0][0], '/sbin/iptables')
- self.assertEqual(run_command.call_args[0][0][1], '-t')
- self.assertEqual(run_command.call_args[0][0][2], 'filter')
- self.assertEqual(run_command.call_args[0][0][3], '-F')
-
- def test_flush_table_check_true(self):
- """Test flush without parameters and check == true"""
- set_module_args({
- 'flush': True,
- '_ansible_check_mode': True,
- })
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.return_value = 0, '', '' # successful execution, no output
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 0)
-
-# TODO ADD test flush table nat
-# TODO ADD test flush with chain
-# TODO ADD test flush with chain and table nat
-
- def test_policy_table(self):
- """Test change policy of a chain"""
- set_module_args({
- 'policy': 'ACCEPT',
- 'chain': 'INPUT',
- })
- commands_results = [
- (0, 'Chain INPUT (policy DROP)\n', ''),
- (0, '', '')
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 2)
- # import pdb
- # pdb.set_trace()
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-L',
- 'INPUT',
- ])
- self.assertEqual(run_command.call_args_list[1][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-P',
- 'INPUT',
- 'ACCEPT',
- ])
-
- def test_policy_table_no_change(self):
- """Test don't change policy of a chain if the policy is right"""
- set_module_args({
- 'policy': 'ACCEPT',
- 'chain': 'INPUT',
- })
- commands_results = [
- (0, 'Chain INPUT (policy ACCEPT)\n', ''),
- (0, '', '')
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertFalse(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- # import pdb
- # pdb.set_trace()
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-L',
- 'INPUT',
- ])
-
- def test_policy_table_changed_false(self):
- """Test flush without parameters and change == false"""
- set_module_args({
- 'policy': 'ACCEPT',
- 'chain': 'INPUT',
- '_ansible_check_mode': True,
- })
- commands_results = [
- (0, 'Chain INPUT (policy DROP)\n', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- # import pdb
- # pdb.set_trace()
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-L',
- 'INPUT',
- ])
-
-# TODO ADD test policy without chain fail
-# TODO ADD test policy with chain don't exists
-# TODO ADD test policy with wrong choice fail
-
- def test_insert_rule_change_false(self):
- """Test flush without parameters"""
- set_module_args({
- 'chain': 'OUTPUT',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'ACCEPT',
- 'action': 'insert',
- '_ansible_check_mode': True,
- })
-
- commands_results = [
- (1, '', ''),
- (0, '', '')
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- # import pdb
- # pdb.set_trace()
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'OUTPUT',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'ACCEPT'
- ])
-
- def test_insert_rule(self):
- """Test flush without parameters"""
- set_module_args({
- 'chain': 'OUTPUT',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'ACCEPT',
- 'action': 'insert'
- })
-
- commands_results = [
- (1, '', ''),
- (0, '', '')
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 2)
- # import pdb
- # pdb.set_trace()
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'OUTPUT',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'ACCEPT'
- ])
- self.assertEqual(run_command.call_args_list[1][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-I',
- 'OUTPUT',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'ACCEPT'
- ])
-
- def test_append_rule_check_mode(self):
- """Test append a redirection rule in check mode"""
- set_module_args({
- 'chain': 'PREROUTING',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'REDIRECT',
- 'table': 'nat',
- 'to_destination': '5.5.5.5/32',
- 'protocol': 'udp',
- 'destination_port': '22',
- 'to_ports': '8600',
- '_ansible_check_mode': True,
- })
-
- commands_results = [
- (1, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'nat',
- '-C',
- 'PREROUTING',
- '-p',
- 'udp',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'REDIRECT',
- '--to-destination',
- '5.5.5.5/32',
- '--destination-port',
- '22',
- '--to-ports',
- '8600'
- ])
-
- def test_append_rule(self):
- """Test append a redirection rule"""
- set_module_args({
- 'chain': 'PREROUTING',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'REDIRECT',
- 'table': 'nat',
- 'to_destination': '5.5.5.5/32',
- 'protocol': 'udp',
- 'destination_port': '22',
- 'to_ports': '8600'
- })
-
- commands_results = [
- (1, '', ''),
- (0, '', '')
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 2)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'nat',
- '-C',
- 'PREROUTING',
- '-p',
- 'udp',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'REDIRECT',
- '--to-destination',
- '5.5.5.5/32',
- '--destination-port',
- '22',
- '--to-ports',
- '8600'
- ])
- self.assertEqual(run_command.call_args_list[1][0][0], [
- '/sbin/iptables',
- '-t',
- 'nat',
- '-A',
- 'PREROUTING',
- '-p',
- 'udp',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'REDIRECT',
- '--to-destination',
- '5.5.5.5/32',
- '--destination-port',
- '22',
- '--to-ports',
- '8600'
- ])
-
- def test_remove_rule(self):
- """Test flush without parameters"""
- set_module_args({
- 'chain': 'PREROUTING',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'SNAT',
- 'table': 'nat',
- 'to_source': '5.5.5.5/32',
- 'protocol': 'udp',
- 'source_port': '22',
- 'to_ports': '8600',
- 'state': 'absent',
- 'in_interface': 'eth0',
- 'out_interface': 'eth1',
- 'comment': 'this is a comment'
- })
-
- commands_results = [
- (0, '', ''),
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 2)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'nat',
- '-C',
- 'PREROUTING',
- '-p',
- 'udp',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'SNAT',
- '--to-source',
- '5.5.5.5/32',
- '-i',
- 'eth0',
- '-o',
- 'eth1',
- '--source-port',
- '22',
- '--to-ports',
- '8600',
- '-m',
- 'comment',
- '--comment',
- 'this is a comment'
- ])
- self.assertEqual(run_command.call_args_list[1][0][0], [
- '/sbin/iptables',
- '-t',
- 'nat',
- '-D',
- 'PREROUTING',
- '-p',
- 'udp',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'SNAT',
- '--to-source',
- '5.5.5.5/32',
- '-i',
- 'eth0',
- '-o',
- 'eth1',
- '--source-port',
- '22',
- '--to-ports',
- '8600',
- '-m',
- 'comment',
- '--comment',
- 'this is a comment'
- ])
-
- def test_remove_rule_check_mode(self):
- """Test flush without parameters check mode"""
- set_module_args({
- 'chain': 'PREROUTING',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'SNAT',
- 'table': 'nat',
- 'to_source': '5.5.5.5/32',
- 'protocol': 'udp',
- 'source_port': '22',
- 'to_ports': '8600',
- 'state': 'absent',
- 'in_interface': 'eth0',
- 'out_interface': 'eth1',
- 'comment': 'this is a comment',
- '_ansible_check_mode': True,
- })
-
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'nat',
- '-C',
- 'PREROUTING',
- '-p',
- 'udp',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'SNAT',
- '--to-source',
- '5.5.5.5/32',
- '-i',
- 'eth0',
- '-o',
- 'eth1',
- '--source-port',
- '22',
- '--to-ports',
- '8600',
- '-m',
- 'comment',
- '--comment',
- 'this is a comment'
- ])
-
- def test_insert_with_reject(self):
- """ Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """
- set_module_args({
- 'chain': 'INPUT',
- 'protocol': 'tcp',
- 'reject_with': 'tcp-reset',
- 'ip_version': 'ipv4',
- })
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'INPUT',
- '-p',
- 'tcp',
- '-j',
- 'REJECT',
- '--reject-with',
- 'tcp-reset',
- ])
-
- def test_insert_jump_reject_with_reject(self):
- """ Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """
- set_module_args({
- 'chain': 'INPUT',
- 'protocol': 'tcp',
- 'jump': 'REJECT',
- 'reject_with': 'tcp-reset',
- 'ip_version': 'ipv4',
- })
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'INPUT',
- '-p',
- 'tcp',
- '-j',
- 'REJECT',
- '--reject-with',
- 'tcp-reset',
- ])
-
- def test_jump_tee_gateway_negative(self):
- """ Missing gateway when JUMP is set to TEE """
- set_module_args({
- 'table': 'mangle',
- 'chain': 'PREROUTING',
- 'in_interface': 'eth0',
- 'protocol': 'udp',
- 'match': 'state',
- 'jump': 'TEE',
- 'ctstate': ['NEW'],
- 'destination_port': '9521',
- 'destination': '127.0.0.1'
- })
-
- with self.assertRaises(AnsibleFailJson) as e:
- iptables.main()
- self.assertTrue(e.exception.args[0]['failed'])
- self.assertEqual(e.exception.args[0]['msg'], 'jump is TEE but all of the following are missing: gateway')
-
- def test_jump_tee_gateway(self):
- """ Using gateway when JUMP is set to TEE """
- set_module_args({
- 'table': 'mangle',
- 'chain': 'PREROUTING',
- 'in_interface': 'eth0',
- 'protocol': 'udp',
- 'match': 'state',
- 'jump': 'TEE',
- 'ctstate': ['NEW'],
- 'destination_port': '9521',
- 'gateway': '192.168.10.1',
- 'destination': '127.0.0.1'
- })
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t', 'mangle',
- '-C', 'PREROUTING',
- '-p', 'udp',
- '-d', '127.0.0.1',
- '-m', 'state',
- '-j', 'TEE',
- '--gateway', '192.168.10.1',
- '-i', 'eth0',
- '--destination-port', '9521',
- '--state', 'NEW'
- ])
-
- def test_tcp_flags(self):
- """ Test various ways of inputting tcp_flags """
- args = [
- {
- 'chain': 'OUTPUT',
- 'protocol': 'tcp',
- 'jump': 'DROP',
- 'tcp_flags': 'flags=ALL flags_set="ACK,RST,SYN,FIN"'
- },
- {
- 'chain': 'OUTPUT',
- 'protocol': 'tcp',
- 'jump': 'DROP',
- 'tcp_flags': {
- 'flags': 'ALL',
- 'flags_set': 'ACK,RST,SYN,FIN'
- }
- },
- {
- 'chain': 'OUTPUT',
- 'protocol': 'tcp',
- 'jump': 'DROP',
- 'tcp_flags': {
- 'flags': ['ALL'],
- 'flags_set': ['ACK', 'RST', 'SYN', 'FIN']
- }
- },
-
- ]
-
- for item in args:
- set_module_args(item)
-
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'OUTPUT',
- '-p',
- 'tcp',
- '--tcp-flags',
- 'ALL',
- 'ACK,RST,SYN,FIN',
- '-j',
- 'DROP'
- ])
-
- def test_log_level(self):
- """ Test various ways of log level flag """
-
- log_levels = ['0', '1', '2', '3', '4', '5', '6', '7',
- 'emerg', 'alert', 'crit', 'error', 'warning', 'notice', 'info', 'debug']
-
- for log_lvl in log_levels:
- set_module_args({
- 'chain': 'INPUT',
- 'jump': 'LOG',
- 'log_level': log_lvl,
- 'source': '1.2.3.4/32',
- 'log_prefix': '** DROP-this_ip **'
- })
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t', 'filter',
- '-C', 'INPUT',
- '-s', '1.2.3.4/32',
- '-j', 'LOG',
- '--log-prefix', '** DROP-this_ip **',
- '--log-level', log_lvl
- ])
-
- def test_iprange(self):
- """ Test iprange module with its flags src_range and dst_range """
- set_module_args({
- 'chain': 'INPUT',
- 'match': ['iprange'],
- 'src_range': '192.168.1.100-192.168.1.199',
- 'jump': 'ACCEPT'
- })
-
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'INPUT',
- '-m',
- 'iprange',
- '-j',
- 'ACCEPT',
- '--src-range',
- '192.168.1.100-192.168.1.199',
- ])
-
- set_module_args({
- 'chain': 'INPUT',
- 'src_range': '192.168.1.100-192.168.1.199',
- 'dst_range': '10.0.0.50-10.0.0.100',
- 'jump': 'ACCEPT'
- })
-
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'INPUT',
- '-j',
- 'ACCEPT',
- '-m',
- 'iprange',
- '--src-range',
- '192.168.1.100-192.168.1.199',
- '--dst-range',
- '10.0.0.50-10.0.0.100'
- ])
-
- set_module_args({
- 'chain': 'INPUT',
- 'dst_range': '10.0.0.50-10.0.0.100',
- 'jump': 'ACCEPT'
- })
-
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'INPUT',
- '-j',
- 'ACCEPT',
- '-m',
- 'iprange',
- '--dst-range',
- '10.0.0.50-10.0.0.100'
- ])
-
- def test_insert_rule_with_wait(self):
- """Test flush without parameters"""
- set_module_args({
- 'chain': 'OUTPUT',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'ACCEPT',
- 'action': 'insert',
- 'wait': '10'
- })
-
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'OUTPUT',
- '-w',
- '10',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'ACCEPT'
- ])
diff --git a/test/units/modules/system/test_known_hosts.py b/test/units/modules/system/test_known_hosts.py
deleted file mode 100644
index 962cf05f5b..0000000000
--- a/test/units/modules/system/test_known_hosts.py
+++ /dev/null
@@ -1,107 +0,0 @@
-import os
-import tempfile
-from ansible.module_utils import basic
-
-from units.compat import unittest
-from ansible.module_utils._text import to_bytes
-from ansible.module_utils.basic import AnsibleModule
-
-from ansible.modules.system.known_hosts import compute_diff, sanity_check
-
-
-class KnownHostsDiffTestCase(unittest.TestCase):
-
- def _create_file(self, content):
- tmp_file = tempfile.NamedTemporaryFile(prefix='ansible-test-', suffix='-known_hosts', delete=False)
- tmp_file.write(to_bytes(content))
- tmp_file.close()
- self.addCleanup(os.unlink, tmp_file.name)
- return tmp_file.name
-
- def test_no_existing_file(self):
- path = tempfile.mktemp(prefix='ansible-test-', suffix='-known_hosts')
- key = 'example.com ssh-rsa AAAAetc\n'
- diff = compute_diff(path, found_line=None, replace_or_add=False, state='present', key=key)
- self.assertEqual(diff, {
- 'before_header': '/dev/null',
- 'after_header': path,
- 'before': '',
- 'after': 'example.com ssh-rsa AAAAetc\n',
- })
-
- def test_key_addition(self):
- path = self._create_file(
- 'two.example.com ssh-rsa BBBBetc\n'
- )
- key = 'one.example.com ssh-rsa AAAAetc\n'
- diff = compute_diff(path, found_line=None, replace_or_add=False, state='present', key=key)
- self.assertEqual(diff, {
- 'before_header': path,
- 'after_header': path,
- 'before': 'two.example.com ssh-rsa BBBBetc\n',
- 'after': 'two.example.com ssh-rsa BBBBetc\none.example.com ssh-rsa AAAAetc\n',
- })
-
- def test_no_change(self):
- path = self._create_file(
- 'one.example.com ssh-rsa AAAAetc\n'
- 'two.example.com ssh-rsa BBBBetc\n'
- )
- key = 'one.example.com ssh-rsa AAAAetc\n'
- diff = compute_diff(path, found_line=1, replace_or_add=False, state='present', key=key)
- self.assertEqual(diff, {
- 'before_header': path,
- 'after_header': path,
- 'before': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n',
- 'after': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n',
- })
-
- def test_key_change(self):
- path = self._create_file(
- 'one.example.com ssh-rsa AAAaetc\n'
- 'two.example.com ssh-rsa BBBBetc\n'
- )
- key = 'one.example.com ssh-rsa AAAAetc\n'
- diff = compute_diff(path, found_line=1, replace_or_add=True, state='present', key=key)
- self.assertEqual(diff, {
- 'before_header': path,
- 'after_header': path,
- 'before': 'one.example.com ssh-rsa AAAaetc\ntwo.example.com ssh-rsa BBBBetc\n',
- 'after': 'two.example.com ssh-rsa BBBBetc\none.example.com ssh-rsa AAAAetc\n',
- })
-
- def test_key_removal(self):
- path = self._create_file(
- 'one.example.com ssh-rsa AAAAetc\n'
- 'two.example.com ssh-rsa BBBBetc\n'
- )
- key = 'one.example.com ssh-rsa AAAAetc\n'
- diff = compute_diff(path, found_line=1, replace_or_add=False, state='absent', key=key)
- self.assertEqual(diff, {
- 'before_header': path,
- 'after_header': path,
- 'before': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n',
- 'after': 'two.example.com ssh-rsa BBBBetc\n',
- })
-
- def test_key_removal_no_change(self):
- path = self._create_file(
- 'two.example.com ssh-rsa BBBBetc\n'
- )
- key = 'one.example.com ssh-rsa AAAAetc\n'
- diff = compute_diff(path, found_line=None, replace_or_add=False, state='absent', key=key)
- self.assertEqual(diff, {
- 'before_header': path,
- 'after_header': path,
- 'before': 'two.example.com ssh-rsa BBBBetc\n',
- 'after': 'two.example.com ssh-rsa BBBBetc\n',
- })
-
- def test_sanity_check(self):
- basic._load_params = lambda: {}
- # Module used internally to execute ssh-keygen system executable
- module = AnsibleModule(argument_spec={})
- host = '10.0.0.1'
- key = '%s ssh-rsa ASDF foo@bar' % (host,)
- keygen = module.get_bin_path('ssh-keygen')
- sanity_check(module, host, key, keygen)
diff --git a/test/units/modules/system/test_systemd.py b/test/units/modules/system/test_systemd.py
deleted file mode 100644
index 8471fd5c3b..0000000000
--- a/test/units/modules/system/test_systemd.py
+++ /dev/null
@@ -1,50 +0,0 @@
-
-from units.compat import unittest
-from ansible.modules.system.systemd import parse_systemctl_show
-
-
-class ParseSystemctlShowTestCase(unittest.TestCase):
-
- def test_simple(self):
- lines = [
- 'Type=simple',
- 'Restart=no',
- 'Requires=system.slice sysinit.target',
- 'Description=Blah blah blah',
- ]
- parsed = parse_systemctl_show(lines)
- self.assertEqual(parsed, {
- 'Type': 'simple',
- 'Restart': 'no',
- 'Requires': 'system.slice sysinit.target',
- 'Description': 'Blah blah blah',
- })
-
- def test_multiline_exec(self):
- # This was taken from a real service that specified "ExecStart=/bin/echo foo\nbar"
- lines = [
- 'Type=simple',
- 'ExecStart={ path=/bin/echo ; argv[]=/bin/echo foo',
- 'bar ; ignore_errors=no ; start_time=[n/a] ; stop_time=[n/a] ; pid=0 ; code=(null) ; status=0/0 }',
- 'Description=blah',
- ]
- parsed = parse_systemctl_show(lines)
- self.assertEqual(parsed, {
- 'Type': 'simple',
- 'ExecStart': '{ path=/bin/echo ; argv[]=/bin/echo foo\n'
- 'bar ; ignore_errors=no ; start_time=[n/a] ; stop_time=[n/a] ; pid=0 ; code=(null) ; status=0/0 }',
- 'Description': 'blah',
- })
-
- def test_single_line_with_brace(self):
- lines = [
- 'Type=simple',
- 'Description={ this is confusing',
- 'Restart=no',
- ]
- parsed = parse_systemctl_show(lines)
- self.assertEqual(parsed, {
- 'Type': 'simple',
- 'Description': '{ this is confusing',
- 'Restart': 'no',
- })