summaryrefslogtreecommitdiff
path: root/fail2ban/tests
diff options
context:
space:
mode:
authorsebres <serg.brester@sebres.de>2017-07-10 19:57:02 +0200
committersebres <serg.brester@sebres.de>2017-07-10 19:57:02 +0200
commit36d42d7f0bb795a271990a7aceb5de783d136467 (patch)
tree4a7eb300fdc8ae8a1d6e9d9706b65d3a62354ba6 /fail2ban/tests
parentd32a3913cfb08bd2cebe041204bfe14f060da0c3 (diff)
downloadfail2ban-36d42d7f0bb795a271990a7aceb5de783d136467.tar.gz
SampleRegexsFactory: introduce opportunity to supply multiple options combinations (check lines using filters with several options), see for example filter sshd.conf
Diffstat (limited to 'fail2ban/tests')
-rw-r--r--fail2ban/tests/files/logs/sshd6
-rw-r--r--fail2ban/tests/samplestestcase.py199
2 files changed, 115 insertions, 90 deletions
diff --git a/fail2ban/tests/files/logs/sshd b/fail2ban/tests/files/logs/sshd
index fb3defea..b9559359 100644
--- a/fail2ban/tests/files/logs/sshd
+++ b/fail2ban/tests/files/logs/sshd
@@ -1,3 +1,5 @@
+# filterOptions: [{}, {"mode": "aggressive"}]
+
#1
# failJSON: { "time": "2005-06-21T16:47:48", "match": true , "host": "192.030.0.6" }
Jun 21 16:47:48 digital-mlhhyiqscv sshd[13709]: error: PAM: Authentication failure for myhlj1374 from 192.030.0.6
@@ -189,7 +191,7 @@ Apr 27 13:02:04 host sshd[29116]: Received disconnect from 1.2.3.4: 11: Normal S
# failJSON: { "time": "2015-04-16T20:02:50", "match": true , "host": "222.186.21.217", "desc": "Authentication for user failed" }
2015-04-16T18:02:50.321974+00:00 host sshd[2716]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=222.186.21.217 user=root
-# filterOptions: {"mode": "ddos"}
+# filterOptions: [{"mode": "ddos"}, {"mode": "aggressive"}]
# http://forums.powervps.com/showthread.php?t=1667
# failJSON: { "time": "2005-06-07T01:10:56", "match": true , "host": "69.61.56.114" }
@@ -218,7 +220,7 @@ Nov 24 23:46:43 host sshd[32686]: fatal: Read from socket failed: Connection res
Mar 15 09:20:57 host sshd[28972]: Connection reset by 192.0.2.39 port 14282 [preauth]
-# filterOptions: {"mode": "extra"}
+# filterOptions: [{"mode": "extra"}, {"mode": "aggressive"}]
# several other cases from gh-864:
# failJSON: { "time": "2004-11-25T01:34:12", "match": true , "host": "127.0.0.1", "desc": "No supported authentication methods" }
diff --git a/fail2ban/tests/samplestestcase.py b/fail2ban/tests/samplestestcase.py
index 121c1c5c..5f0a447a 100644
--- a/fail2ban/tests/samplestestcase.py
+++ b/fail2ban/tests/samplestestcase.py
@@ -49,7 +49,8 @@ class FilterSamplesRegex(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
super(FilterSamplesRegex, self).setUp()
- self.filter = None
+ self._filters = dict()
+ self._filterTests = None
setUpMyTime()
def tearDown(self):
@@ -79,14 +80,20 @@ class FilterSamplesRegex(unittest.TestCase):
RE_WRONG_GREED.search('non-greedy .+? test' + RE_HOST + ' test vary catch-all .* anchored$'))
- def _readFilter(self, name, basedir, opts=None):
- self.filter = Filter(None)
- self.filter.returnRawHost = True
- self.filter.checkAllRegex = True
- self.filter.checkFindTime = False
- self.filter.active = True
+ def _readFilter(self, fltName, name, basedir, opts=None):
+ # Check filter with this option combination was already used:
+ flt = self._filters.get(fltName)
+ if flt:
+ return flt
+ # First time:
+ flt = Filter(None)
+ flt.returnRawHost = True
+ flt.checkAllRegex = True
+ flt.checkFindTime = False
+ flt.active = True
+ # Read filter:
if opts is None: opts = dict()
- # Check filter exists
+ opts = opts.copy()
filterConf = FilterReader(name, "jail", opts,
basedir=basedir, share_config=unittest.F2B.share_config)
self.assertEqual(filterConf.getFile(), name)
@@ -103,25 +110,28 @@ class FilterSamplesRegex(unittest.TestCase):
self.fail('Unexpected config-token %r in stream' % (opt,))
for optval in optval:
if opt[2] == "prefregex":
- self.filter.prefRegex = optval
+ flt.prefRegex = optval
elif opt[2] == "addfailregex":
- self.filter.addFailRegex(optval)
+ flt.addFailRegex(optval)
elif opt[2] == "addignoreregex":
- self.filter.addIgnoreRegex(optval)
+ flt.addIgnoreRegex(optval)
elif opt[2] == "maxlines":
- self.filter.setMaxLines(optval)
+ flt.setMaxLines(optval)
elif opt[2] == "datepattern":
- self.filter.setDatePattern(optval)
+ flt.setDatePattern(optval)
# test regexp contains greedy catch-all before <HOST>, that is
# not hard-anchored at end or has not precise sub expression after <HOST>:
- regexList = self.filter.getFailRegex()
+ regexList = flt.getFailRegex()
for fr in regexList:
if RE_WRONG_GREED.search(fr): # pragma: no cover
raise AssertionError("Following regexp of \"%s\" contains greedy catch-all before <HOST>, "
"that is not hard-anchored at end or has not precise sub expression after <HOST>:\n%s" %
- (name, str(fr).replace(RE_HOST, '<HOST>')))
- return regexList
+ (fltName, str(fr).replace(RE_HOST, '<HOST>')))
+ # Cache within used filter combinations and return:
+ flt = [flt, set()]
+ self._filters[fltName] = flt
+ return flt
def testSampleRegexsFactory(name, basedir):
def testFilter(self):
@@ -129,18 +139,11 @@ def testSampleRegexsFactory(name, basedir):
self.assertTrue(
os.path.isfile(os.path.join(TEST_FILES_DIR, "logs", name)),
"No sample log file available for '%s' filter" % name)
-
- regexList = None
- regexsUsedIdx = set()
- regexsUsedRe = set()
+
filenames = [name]
+ regexsUsedRe = set()
- def _testMissingSamples():
- for failRegexIndex, failRegex in enumerate(regexList):
- self.assertTrue(
- failRegexIndex in regexsUsedIdx or failRegex in regexsUsedRe,
- "Regex for filter '%s' has no samples: %i: %r" %
- (name, failRegexIndex, failRegex))
+ # process each test-file (note: array filenames can grow during processing):
i = 0
while i < len(filenames):
filename = filenames[i]; i += 1;
@@ -154,13 +157,17 @@ def testSampleRegexsFactory(name, basedir):
faildata = json.loads(jsonREMatch.group(2))
# filterOptions - dict in JSON to control filter options (e. g. mode, etc.):
if jsonREMatch.group(1) == 'filterOptions':
- # another filter mode - we should check previous also:
- if self.filter is not None:
- _testMissingSamples()
- regexsUsedIdx = set() # clear used indices (possible overlapping by mode change)
- # read filter with another setting:
- self.filter = None
- regexList = self._readFilter(name, basedir, opts=faildata)
+ # following lines with another filter options:
+ self._filterTests = []
+ for opts in (faildata if isinstance(faildata, list) else [faildata]):
+ # unique filter name (using options combination):
+ self.assertTrue(isinstance(opts, dict))
+ fltName = opts.get('filterName')
+ if not fltName: fltName = str(opts) if opts else ''
+ fltName = name + fltName
+ # read it:
+ flt = self._readFilter(fltName, name, basedir, opts=opts)
+ self._filterTests.append((fltName, flt))
continue
# addFILE - filename to "include" test-files should be additionally parsed:
if jsonREMatch.group(1) == 'addFILE':
@@ -176,65 +183,81 @@ def testSampleRegexsFactory(name, basedir):
else: # pragma: no cover - normally unreachable
faildata = {}
- if self.filter is None:
- regexList = self._readFilter(name, basedir, opts=None)
+ # if filter options was not yet specified:
+ if not self._filterTests:
+ fltName = name
+ flt = self._readFilter(fltName, name, basedir, opts=None)
+ self._filterTests = [(fltName, flt)]
+
+ # process line using several filter options (if specified in the test-file):
+ for fltName, flt in self._filterTests:
+ flt, regexsUsedIdx = flt
+ regexList = flt.getFailRegex()
+
+ try:
+ ret = flt.processLine(line)
+ if not ret:
+ # Bypass if filter constraint specified:
+ if faildata.get('filter') and name != faildata.get('filter'):
+ continue
+ # Check line is flagged as none match
+ self.assertFalse(faildata.get('match', True),
+ "Line not matched when should have")
+ continue
- try:
- ret = self.filter.processLine(line)
- if not ret:
- # Bypass if filter constraint specified:
- if faildata.get('filter') and name != faildata.get('filter'):
+ failregex, fid, fail2banTime, fail = ret[0]
+ # Bypass no failure helpers-regexp:
+ if not faildata.get('match', False) and (fid is None or fail.get('nofail')):
+ regexsUsedIdx.add(failregex)
+ regexsUsedRe.add(regexList[failregex])
continue
- # Check line is flagged as none match
- self.assertFalse(faildata.get('match', True),
- "Line not matched when should have")
- continue
-
- failregex, fid, fail2banTime, fail = ret[0]
- # Bypass no failure helpers-regexp:
- if not faildata.get('match', False) and (fid is None or fail.get('nofail')):
+
+ # Check line is flagged to match
+ self.assertTrue(faildata.get('match', False),
+ "Line matched when shouldn't have")
+ self.assertEqual(len(ret), 1,
+ "Multiple regexs matched %r" % (map(lambda x: x[0], ret)))
+
+ # Verify match captures (at least fid/host) and timestamp as expected
+ for k, v in faildata.iteritems():
+ if k not in ("time", "match", "desc", "filter"):
+ fv = fail.get(k, None)
+ # Fallback for backwards compatibility (previously no fid, was host only):
+ if k == "host" and fv is None:
+ fv = fid
+ self.assertEqual(fv, v)
+
+ t = faildata.get("time", None)
+ try:
+ jsonTimeLocal = datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%S")
+ except ValueError:
+ jsonTimeLocal = datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%S.%f")
+
+ jsonTime = time.mktime(jsonTimeLocal.timetuple())
+
+ jsonTime += jsonTimeLocal.microsecond / 1000000
+
+ self.assertEqual(fail2banTime, jsonTime,
+ "UTC Time mismatch %s (%s) != %s (%s) (diff %.3f seconds)" %
+ (fail2banTime, time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(fail2banTime)),
+ jsonTime, time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(jsonTime)),
+ fail2banTime - jsonTime) )
+
regexsUsedIdx.add(failregex)
regexsUsedRe.add(regexList[failregex])
- continue
-
- # Check line is flagged to match
- self.assertTrue(faildata.get('match', False),
- "Line matched when shouldn't have")
- self.assertEqual(len(ret), 1,
- "Multiple regexs matched %r" % (map(lambda x: x[0], ret)))
-
- # Verify match captures (at least fid/host) and timestamp as expected
- for k, v in faildata.iteritems():
- if k not in ("time", "match", "desc", "filter"):
- fv = fail.get(k, None)
- # Fallback for backwards compatibility (previously no fid, was host only):
- if k == "host" and fv is None:
- fv = fid
- self.assertEqual(fv, v)
-
- t = faildata.get("time", None)
- try:
- jsonTimeLocal = datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%S")
- except ValueError:
- jsonTimeLocal = datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%S.%f")
-
- jsonTime = time.mktime(jsonTimeLocal.timetuple())
-
- jsonTime += jsonTimeLocal.microsecond / 1000000
-
- self.assertEqual(fail2banTime, jsonTime,
- "UTC Time mismatch %s (%s) != %s (%s) (diff %.3f seconds)" %
- (fail2banTime, time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(fail2banTime)),
- jsonTime, time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(jsonTime)),
- fail2banTime - jsonTime) )
-
- regexsUsedIdx.add(failregex)
- regexsUsedRe.add(regexList[failregex])
- except AssertionError as e: # pragma: no cover
- raise AssertionError("%s on: %s:%i, line:\n%s" % (
- e, logFile.filename(), logFile.filelineno(), line))
-
- _testMissingSamples()
+ except AssertionError as e: # pragma: no cover
+ raise AssertionError("%s: %s on: %s:%i, line:\n%s" % (
+ fltName, e, logFile.filename(), logFile.filelineno(), line))
+
+ # check missing samples for regex using each filter-options combination:
+ for fltName, flt in self._filters.iteritems():
+ flt, regexsUsedIdx = flt
+ regexList = flt.getFailRegex()
+ for failRegexIndex, failRegex in enumerate(regexList):
+ self.assertTrue(
+ failRegexIndex in regexsUsedIdx or failRegex in regexsUsedRe,
+ "%s: Regex has no samples: %i: %r" %
+ (fltName, failRegexIndex, failRegex))
return testFilter