summaryrefslogtreecommitdiff
path: root/fail2ban/tests/actiontestcase.py
diff options
context:
space:
mode:
Diffstat (limited to 'fail2ban/tests/actiontestcase.py')
-rw-r--r--fail2ban/tests/actiontestcase.py262
1 files changed, 170 insertions, 92 deletions
diff --git a/fail2ban/tests/actiontestcase.py b/fail2ban/tests/actiontestcase.py
index 5f4aa460..ce5de483 100644
--- a/fail2ban/tests/actiontestcase.py
+++ b/fail2ban/tests/actiontestcase.py
@@ -34,8 +34,8 @@ from ..server.actions import OrderedDict, Actions
from ..server.utils import Utils
from .dummyjail import DummyJail
-from .utils import LogCaptureTestCase
-from .utils import pid_exists
+from .utils import pid_exists, with_tmpdir, LogCaptureTestCase
+
class CommandActionTest(LogCaptureTestCase):
@@ -75,61 +75,59 @@ class CommandActionTest(LogCaptureTestCase):
lambda: substituteRecursiveTags({'A': 'to=<B> fromip=<IP>', 'C': '<B>', 'B': '<C>', 'D': ''}))
self.assertRaises(ValueError,
lambda: substituteRecursiveTags({'failregex': 'to=<honeypot> fromip=<IP>', 'sweet': '<honeypot>', 'honeypot': '<sweet>', 'ignoreregex': ''}))
- # We need here an ordered, because the sequence of iteration is very important for this test
- if OrderedDict:
- # No cyclic recursion, just multiple replacement of tag <T>, should be successful:
- self.assertEqual(substituteRecursiveTags( OrderedDict(
- (('X', 'x=x<T>'), ('T', '1'), ('Z', '<X> <T> <Y>'), ('Y', 'y=y<T>')))
- ), {'X': 'x=x1', 'T': '1', 'Y': 'y=y1', 'Z': 'x=x1 1 y=y1'}
- )
- # No cyclic recursion, just multiple replacement of tag <T> in composite tags, should be successful:
- self.assertEqual(substituteRecursiveTags( OrderedDict(
- (('X', 'x=x<T> <Z> <<R1>> <<R2>>'), ('R1', 'Z'), ('R2', 'Y'), ('T', '1'), ('Z', '<T> <Y>'), ('Y', 'y=y<T>')))
- ), {'X': 'x=x1 1 y=y1 1 y=y1 y=y1', 'R1': 'Z', 'R2': 'Y', 'T': '1', 'Z': '1 y=y1', 'Y': 'y=y1'}
- )
- # No cyclic recursion, just multiple replacement of same tags, should be successful:
- self.assertEqual(substituteRecursiveTags( OrderedDict((
- ('actionstart', 'ipset create <ipmset> hash:ip timeout <bantime> family <ipsetfamily>\n<iptables> -I <chain> <actiontype>'),
- ('ipmset', 'f2b-<name>'),
- ('name', 'any'),
- ('bantime', '600'),
- ('ipsetfamily', 'inet'),
- ('iptables', 'iptables <lockingopt>'),
- ('lockingopt', '-w'),
- ('chain', 'INPUT'),
- ('actiontype', '<multiport>'),
- ('multiport', '-p <protocol> -m multiport --dports <port> -m set --match-set <ipmset> src -j <blocktype>'),
- ('protocol', 'tcp'),
- ('port', 'ssh'),
- ('blocktype', 'REJECT',),
- ))
- ), OrderedDict((
- ('actionstart', 'ipset create f2b-any hash:ip timeout 600 family inet\niptables -w -I INPUT -p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
- ('ipmset', 'f2b-any'),
- ('name', 'any'),
- ('bantime', '600'),
- ('ipsetfamily', 'inet'),
- ('iptables', 'iptables -w'),
- ('lockingopt', '-w'),
- ('chain', 'INPUT'),
- ('actiontype', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
- ('multiport', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
- ('protocol', 'tcp'),
- ('port', 'ssh'),
- ('blocktype', 'REJECT')
- ))
- )
- # Cyclic recursion by composite tag creation, tags "create" another tag, that closes cycle:
- self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
- ('A', '<<B><C>>'),
- ('B', 'D'), ('C', 'E'),
- ('DE', 'cycle <A>'),
- )) ))
- self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
- ('DE', 'cycle <A>'),
- ('A', '<<B><C>>'),
- ('B', 'D'), ('C', 'E'),
- )) ))
+ # No cyclic recursion, just multiple replacement of tag <T>, should be successful:
+ self.assertEqual(substituteRecursiveTags( OrderedDict(
+ (('X', 'x=x<T>'), ('T', '1'), ('Z', '<X> <T> <Y>'), ('Y', 'y=y<T>')))
+ ), {'X': 'x=x1', 'T': '1', 'Y': 'y=y1', 'Z': 'x=x1 1 y=y1'}
+ )
+ # No cyclic recursion, just multiple replacement of tag <T> in composite tags, should be successful:
+ self.assertEqual(substituteRecursiveTags( OrderedDict(
+ (('X', 'x=x<T> <Z> <<R1>> <<R2>>'), ('R1', 'Z'), ('R2', 'Y'), ('T', '1'), ('Z', '<T> <Y>'), ('Y', 'y=y<T>')))
+ ), {'X': 'x=x1 1 y=y1 1 y=y1 y=y1', 'R1': 'Z', 'R2': 'Y', 'T': '1', 'Z': '1 y=y1', 'Y': 'y=y1'}
+ )
+ # No cyclic recursion, just multiple replacement of same tags, should be successful:
+ self.assertEqual(substituteRecursiveTags( OrderedDict((
+ ('actionstart', 'ipset create <ipmset> hash:ip timeout <bantime> family <ipsetfamily>\n<iptables> -I <chain> <actiontype>'),
+ ('ipmset', 'f2b-<name>'),
+ ('name', 'any'),
+ ('bantime', '600'),
+ ('ipsetfamily', 'inet'),
+ ('iptables', 'iptables <lockingopt>'),
+ ('lockingopt', '-w'),
+ ('chain', 'INPUT'),
+ ('actiontype', '<multiport>'),
+ ('multiport', '-p <protocol> -m multiport --dports <port> -m set --match-set <ipmset> src -j <blocktype>'),
+ ('protocol', 'tcp'),
+ ('port', 'ssh'),
+ ('blocktype', 'REJECT',),
+ ))
+ ), OrderedDict((
+ ('actionstart', 'ipset create f2b-any hash:ip timeout 600 family inet\niptables -w -I INPUT -p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
+ ('ipmset', 'f2b-any'),
+ ('name', 'any'),
+ ('bantime', '600'),
+ ('ipsetfamily', 'inet'),
+ ('iptables', 'iptables -w'),
+ ('lockingopt', '-w'),
+ ('chain', 'INPUT'),
+ ('actiontype', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
+ ('multiport', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
+ ('protocol', 'tcp'),
+ ('port', 'ssh'),
+ ('blocktype', 'REJECT')
+ ))
+ )
+ # Cyclic recursion by composite tag creation, tags "create" another tag, that closes cycle:
+ self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
+ ('A', '<<B><C>>'),
+ ('B', 'D'), ('C', 'E'),
+ ('DE', 'cycle <A>'),
+ )) ))
+ self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
+ ('DE', 'cycle <A>'),
+ ('A', '<<B><C>>'),
+ ('B', 'D'), ('C', 'E'),
+ )) ))
# missing tags are ok
self.assertEqual(substituteRecursiveTags({'A': '<C>'}), {'A': '<C>'})
@@ -206,15 +204,15 @@ class CommandActionTest(LogCaptureTestCase):
"Text 890 text 123 ABC")
self.assertEqual(
self.__action.replaceTag("<matches>",
- {'matches': "some >char< should \< be[ escap}ed&\n"}),
+ {'matches': "some >char< should \\< be[ escap}ed&\n"}),
"some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\\n")
self.assertEqual(
self.__action.replaceTag("<ipmatches>",
- {'ipmatches': "some >char< should \< be[ escap}ed&\n"}),
+ {'ipmatches': "some >char< should \\< be[ escap}ed&\n"}),
"some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\\n")
self.assertEqual(
self.__action.replaceTag("<ipjailmatches>",
- {'ipjailmatches': "some >char< should \< be[ escap}ed&\r\n"}),
+ {'ipjailmatches': "some >char< should \\< be[ escap}ed&\r\n"}),
"some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\\r\\n")
# Recursive
@@ -252,7 +250,7 @@ class CommandActionTest(LogCaptureTestCase):
delattr(self.__action, 'ac')
# produce self-referencing query except:
self.assertRaisesRegexp(ValueError, r"possible self referencing definitions in query",
- lambda: self.__action.replaceTag("<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x>>>>>>>>>>>>>>>>>>>>>",
+ lambda: self.__action.replaceTag("<x"*30+">"*30,
self.__action._properties, conditional="family=inet6")
)
@@ -297,61 +295,128 @@ class CommandActionTest(LogCaptureTestCase):
"Text 000-567 text 567 '567'")
self.assertTrue(len(cache) >= 3)
-
- def testExecuteActionBan(self):
- self.__action.actionstart = "touch /tmp/fail2ban.test"
- self.assertEqual(self.__action.actionstart, "touch /tmp/fail2ban.test")
- self.__action.actionstop = "rm -f /tmp/fail2ban.test"
- self.assertEqual(self.__action.actionstop, 'rm -f /tmp/fail2ban.test')
- self.__action.actionban = "echo -n"
- self.assertEqual(self.__action.actionban, 'echo -n')
- self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]"
- self.assertEqual(self.__action.actioncheck, '[ -e /tmp/fail2ban.test ]')
+ @with_tmpdir
+ def testExecuteActionBan(self, tmp):
+ tmp += "/fail2ban.test"
+ self.__action.actionstart = "touch '%s'" % tmp
+ self.__action.actionrepair = self.__action.actionstart
+ self.assertEqual(self.__action.actionstart, "touch '%s'" % tmp)
+ self.__action.actionstop = "rm -f '%s'" % tmp
+ self.assertEqual(self.__action.actionstop, "rm -f '%s'" % tmp)
+ self.__action.actionban = "<actioncheck> && echo -n"
+ self.assertEqual(self.__action.actionban, "<actioncheck> && echo -n")
+ self.__action.actioncheck = "[ -e '%s' ]" % tmp
+ self.assertEqual(self.__action.actioncheck, "[ -e '%s' ]" % tmp)
self.__action.actionunban = "true"
self.assertEqual(self.__action.actionunban, 'true')
+ self.pruneLog()
self.assertNotLogged('returned')
# no action was actually executed yet
+ # start on demand is false, so it should cause failure on first attempt of ban:
self.__action.ban({'ip': None})
self.assertLogged('Invariant check failed')
self.assertLogged('returned successfully')
+ self.__action.stop()
+ self.assertLogged(self.__action.actionstop)
def testExecuteActionEmptyUnban(self):
+ # unban will be executed for actions with banned items only:
+ self.__action.actionban = ""
self.__action.actionunban = ""
+ self.__action.actionflush = "echo -n 'flush'"
+ self.__action.actionstop = "echo -n 'stop'"
+ self.__action.start();
+ self.__action.ban({});
+ self.pruneLog()
self.__action.unban({})
- self.assertLogged('Nothing to do')
+ self.assertLogged('Nothing to do', wait=True)
+ # same as above but with interim flush, so no unban anymore:
+ self.__action.ban({});
+ self.pruneLog('[phase 2]')
+ self.__action.flush()
+ self.__action.unban({})
+ self.__action.stop()
+ self.assertLogged('stop', wait=True)
+ self.assertNotLogged('Nothing to do')
- def testExecuteActionStartCtags(self):
+ @with_tmpdir
+ def testExecuteActionStartCtags(self, tmp):
+ tmp += '/fail2ban.test'
self.__action.HOST = "192.0.2.0"
- self.__action.actionstart = "touch /tmp/fail2ban.test.<HOST>"
- self.__action.actionstop = "rm -f /tmp/fail2ban.test.<HOST>"
- self.__action.actioncheck = "[ -e /tmp/fail2ban.test.192.0.2.0 ]"
+ self.__action.actionstart = "touch '%s.<HOST>'" % tmp
+ self.__action.actionstop = "rm -f '%s.<HOST>'" % tmp
+ self.__action.actioncheck = "[ -e '%s.192.0.2.0' ]" % tmp
self.__action.start()
+ self.__action.consistencyCheck()
- def testExecuteActionCheckRestoreEnvironment(self):
+ @with_tmpdir
+ def testExecuteActionCheckRestoreEnvironment(self, tmp):
+ tmp += '/fail2ban.test'
self.__action.actionstart = ""
- self.__action.actionstop = "rm -f /tmp/fail2ban.test"
- self.__action.actionban = "rm /tmp/fail2ban.test"
- self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]"
+ self.__action.actionstop = "rm -f '%s'" % tmp
+ self.__action.actionban = "rm '%s'" % tmp
+ self.__action.actioncheck = "[ -e '%s' ]" % tmp
self.assertRaises(RuntimeError, self.__action.ban, {'ip': None})
self.assertLogged('Invariant check failed', 'Unable to restore environment', all=True)
# 2nd time, try to restore with producing error in stop, but succeeded start hereafter:
self.pruneLog('[phase 2]')
- self.__action.actionstart = "touch /tmp/fail2ban.test"
- self.__action.actionstop = "rm /tmp/fail2ban.test"
- self.__action.actionban = 'printf "%%b\n" <ip> >> /tmp/fail2ban.test'
- self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]"
+ self.__action.actionstart = "touch '%s'" % tmp
+ self.__action.actionstop = "rm '%s'" % tmp
+ self.__action.actionban = """<actioncheck> && printf "%%%%b\n" <ip> >> '%s'""" % tmp
+ self.__action.actioncheck = "[ -e '%s' ]" % tmp
self.__action.ban({'ip': None})
self.assertLogged('Invariant check failed')
self.assertNotLogged('Unable to restore environment')
- def testExecuteActionCheckRepairEnvironment(self):
+ @with_tmpdir
+ def testExecuteActionCheckOnBanFailure(self, tmp):
+ tmp += '/fail2ban.test'
+ self.__action.actionstart = "touch '%s'; echo 'started ...'" % tmp
+ self.__action.actionstop = "rm -f '%s'" % tmp
+ self.__action.actionban = "[ -e '%s' ] && echo 'banned '<ip>" % tmp
+ self.__action.actioncheck = "[ -e '%s' ] && echo 'check ok' || { echo 'check failed'; exit 1; }" % tmp
+ self.__action.actionrepair = "echo 'repair ...'; touch '%s'" % tmp
+ self.__action.actionstart_on_demand = False
+ self.__action.start()
+ # phase 1: with repair;
+ # phase 2: without repair (start/stop), not on demand;
+ # phase 3: without repair (start/stop), start on demand.
+ for i in (1, 2, 3):
+ self.pruneLog('[phase %s]' % i)
+ # 1st time with success ban:
+ self.__action.ban({'ip': '192.0.2.1'})
+ self.assertLogged(
+ "stdout: %r" % 'banned 192.0.2.1', all=True)
+ self.assertNotLogged("Invariant check failed. Trying",
+ "stdout: %r" % 'check failed',
+ "stdout: %r" % ('repair ...' if self.__action.actionrepair else 'started ...'),
+ "stdout: %r" % 'check ok', all=True)
+ # force error in ban:
+ os.remove(tmp)
+ self.pruneLog()
+ # 2nd time with fail recognition, success repair, check and ban:
+ self.__action.ban({'ip': '192.0.2.2'})
+ self.assertLogged("Invariant check failed. Trying",
+ "stdout: %r" % 'check failed',
+ "stdout: %r" % ('repair ...' if self.__action.actionrepair else 'started ...'),
+ "stdout: %r" % 'check ok',
+ "stdout: %r" % 'banned 192.0.2.2', all=True)
+ # repeat without repair (stop/start), herafter enable on demand:
+ if self.__action.actionrepair:
+ self.__action.actionrepair = ""
+ elif not self.__action.actionstart_on_demand:
+ self.__action.actionstart_on_demand = True
+
+ @with_tmpdir
+ def testExecuteActionCheckRepairEnvironment(self, tmp):
+ tmp += '/fail2ban.test'
self.__action.actionstart = ""
self.__action.actionstop = ""
- self.__action.actionban = "rm /tmp/fail2ban.test"
- self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]"
- self.__action.actionrepair = "echo 'repair ...'; touch /tmp/fail2ban.test"
+ self.__action.actionban = "rm '%s'" % tmp
+ self.__action.actioncheck = "[ -e '%s' ]" % tmp
+ self.__action.actionrepair = "echo 'repair ...'; touch '%s'" % tmp
# 1st time with success repair:
self.__action.ban({'ip': None})
self.assertLogged("Invariant check failed. Trying", "echo 'repair ...'", all=True)
@@ -379,13 +444,13 @@ class CommandActionTest(LogCaptureTestCase):
'user': "tester"
}
})
- self.__action.actionban = "touch /tmp/fail2ban.test.123; echo 'failure <F-ID> of <F-USER> -<F-TEST>- from <ip>:<F-PORT>'"
- self.__action.actionunban = "rm /tmp/fail2ban.test.<ABC>; echo 'user <F-USER> unbanned'"
+ self.__action.actionban = "echo '<ABC>, failure <F-ID> of <F-USER> -<F-TEST>- from <ip>:<F-PORT>'"
+ self.__action.actionunban = "echo '<ABC>, user <F-USER> unbanned'"
self.__action.ban(aInfo)
self.__action.unban(aInfo)
self.assertLogged(
- " -- stdout: 'failure 111 of tester -- from 192.0.2.1:222'",
- " -- stdout: 'user tester unbanned'",
+ " -- stdout: '123, failure 111 of tester -- from 192.0.2.1:222'",
+ " -- stdout: '123, user tester unbanned'",
all=True
)
@@ -560,6 +625,19 @@ class CommandActionTest(LogCaptureTestCase):
self.assertEqual(len(m), 3)
self.assertIn('c', m)
self.assertEqual((m['a'], m['b'], m['c']), (5, 11, 'test'))
+ # immutability of copy:
+ m['d'] = 'dddd'
+ m2 = m.copy()
+ m2['c'] = lambda self: self['a'] + 7
+ m2['a'] = 1
+ del m2['b']
+ del m2['d']
+ self.assertTrue('b' in m)
+ self.assertTrue('d' in m)
+ self.assertFalse('b' in m2)
+ self.assertFalse('d' in m2)
+ self.assertEqual((m['a'], m['b'], m['c'], m['d']), (5, 11, 'test', 'dddd'))
+ self.assertEqual((m2['a'], m2['c']), (1, 8))
def testCallingMapRep(self):
m = CallingMap({