summaryrefslogtreecommitdiff
path: root/fail2ban/server/failregex.py
diff options
context:
space:
mode:
Diffstat (limited to 'fail2ban/server/failregex.py')
-rw-r--r--fail2ban/server/failregex.py246
1 files changed, 165 insertions, 81 deletions
diff --git a/fail2ban/server/failregex.py b/fail2ban/server/failregex.py
index 7c51ddb8..d5c9345f 100644
--- a/fail2ban/server/failregex.py
+++ b/fail2ban/server/failregex.py
@@ -27,6 +27,68 @@ import sys
from .ipdns import IPAddr
+
+FTAG_CRE = re.compile(r'</?[\w\-]+/?>')
+
+FCUSTNAME_CRE = re.compile(r'^(/?)F-([A-Z0-9_\-]+)$'); # currently uppercase only
+
+R_HOST = [
+ # separated ipv4:
+ r"""(?:::f{4,6}:)?(?P<ip4>%s)""" % (IPAddr.IP_4_RE,),
+ # separated ipv6:
+ r"""(?P<ip6>%s)""" % (IPAddr.IP_6_RE,),
+ # place-holder for ipv6 enclosed in optional [] (used in addr-, host-regex)
+ "",
+ # separated dns:
+ r"""(?P<dns>[\w\-.^_]*\w)""",
+ # place-holder for ADDR tag-replacement (joined):
+ "",
+ # place-holder for HOST tag replacement (joined):
+ ""
+]
+RI_IPV4 = 0
+RI_IPV6 = 1
+RI_IPV6BR = 2
+RI_DNS = 3
+RI_ADDR = 4
+RI_HOST = 5
+
+R_HOST[RI_IPV6BR] = r"""\[?%s\]?""" % (R_HOST[RI_IPV6],)
+R_HOST[RI_ADDR] = "(?:%s)" % ("|".join((R_HOST[RI_IPV4], R_HOST[RI_IPV6BR])),)
+R_HOST[RI_HOST] = "(?:%s)" % ("|".join((R_HOST[RI_IPV4], R_HOST[RI_IPV6BR], R_HOST[RI_DNS])),)
+
+RH4TAG = {
+ # separated ipv4 (self closed, closed):
+ "IP4": R_HOST[RI_IPV4],
+ "F-IP4/": R_HOST[RI_IPV4],
+ # separated ipv6 (self closed, closed):
+ "IP6": R_HOST[RI_IPV6],
+ "F-IP6/": R_HOST[RI_IPV6],
+ # 2 address groups instead of <ADDR> - in opposition to `<HOST>`,
+ # for separate usage of 2 address groups only (regardless of `usedns`), `ip4` and `ip6` together
+ "ADDR": R_HOST[RI_ADDR],
+ "F-ADDR/": R_HOST[RI_ADDR],
+ # separated dns (self closed, closed):
+ "DNS": R_HOST[RI_DNS],
+ "F-DNS/": R_HOST[RI_DNS],
+ # default failure-id as no space tag:
+ "F-ID/": r"""(?P<fid>\S+)""",
+ # default failure port, like 80 or http :
+ "F-PORT/": r"""(?P<fport>\w+)""",
+}
+
+# default failure groups map for customizable expressions (with different group-id):
+R_MAP = {
+ "ID": "fid",
+ "PORT": "fport",
+}
+
+def mapTag2Opt(tag):
+ try: # if should be mapped:
+ return R_MAP[tag]
+ except KeyError:
+ return tag.lower()
+
##
# Regular expression class.
#
@@ -41,20 +103,16 @@ class Regex:
# avoid construction of invalid object.
# @param value the regular expression
- def __init__(self, regex, **kwargs):
+ def __init__(self, regex, multiline=False, **kwargs):
self._matchCache = None
# Perform shortcuts expansions.
- # Resolve "<HOST>" tag using default regular expression for host:
+ # Replace standard f2b-tags (like "<HOST>", etc) using default regular expressions:
regex = Regex._resolveHostTag(regex, **kwargs)
- # Replace "<SKIPLINES>" with regular expression for multiple lines.
- regexSplit = regex.split("<SKIPLINES>")
- regex = regexSplit[0]
- for n, regexLine in enumerate(regexSplit[1:]):
- regex += "\n(?P<skiplines%i>(?:(.*\n)*?))" % n + regexLine
+ #
if regex.lstrip() == '':
raise RegexException("Cannot add empty regex")
try:
- self._regexObj = re.compile(regex, re.MULTILINE)
+ self._regexObj = re.compile(regex, re.MULTILINE if multiline else 0)
self._regex = regex
except sre_constants.error:
raise RegexException("Unable to compile regular expression '%s'" %
@@ -71,38 +129,52 @@ class Regex:
@staticmethod
def _resolveHostTag(regex, useDns="yes"):
- # separated ipv4:
- r_host = []
- r = r"""(?:::f{4,6}:)?(?P<ip4>%s)""" % (IPAddr.IP_4_RE,)
- regex = regex.replace("<IP4>", r); # self closed
- regex = regex.replace("<F-IP4/>", r); # closed
- r_host.append(r)
- # separated ipv6:
- r = r"""(?P<ip6>%s)""" % (IPAddr.IP_6_RE,)
- regex = regex.replace("<IP6>", r); # self closed
- regex = regex.replace("<F-IP6/>", r); # closed
- r_host.append(r"""\[?%s\]?""" % (r,)); # enclose ipv6 in optional [] in host-regex
- # 2 address groups instead of <ADDR> - in opposition to `<HOST>`,
- # for separate usage of 2 address groups only (regardless of `usedns`), `ip4` and `ip6` together
- regex = regex.replace("<ADDR>", "(?:%s)" % ("|".join(r_host),))
- # separated dns:
- r = r"""(?P<dns>[\w\-.^_]*\w)"""
- regex = regex.replace("<DNS>", r); # self closed
- regex = regex.replace("<F-DNS/>", r); # closed
- if useDns not in ("no",):
- r_host.append(r)
- # 3 groups instead of <HOST> - separated ipv4, ipv6 and host (dns)
- regex = regex.replace("<HOST>", "(?:%s)" % ("|".join(r_host),))
- # default failure-id as no space tag:
- regex = regex.replace("<F-ID/>", r"""(?P<fid>\S+)"""); # closed
- # default failure port, like 80 or http :
- regex = regex.replace("<F-PORT/>", r"""(?P<port>\w+)"""); # closed
- # default failure groups (begin / end tag) for customizable expressions:
- for o,r in (('IP4', 'ip4'), ('IP6', 'ip6'), ('DNS', 'dns'), ('ID', 'fid'), ('PORT', 'fport')):
- regex = regex.replace("<F-%s>" % o, "(?P<%s>" % r); # open tag
- regex = regex.replace("</F-%s>" % o, ")"); # close tag
-
- return regex
+
+ openTags = dict()
+ props = {
+ 'nl': 0, # new lines counter by <SKIPLINES> tag;
+ }
+ # tag interpolation callable:
+ def substTag(m):
+ tag = m.group()
+ tn = tag[1:-1]
+ # 3 groups instead of <HOST> - separated ipv4, ipv6 and host (dns)
+ if tn == "HOST":
+ return R_HOST[RI_HOST if useDns not in ("no",) else RI_ADDR]
+ # replace "<SKIPLINES>" with regular expression for multiple lines (by buffering with maxlines)
+ if tn == "SKIPLINES":
+ nl = props['nl']
+ props['nl'] = nl + 1
+ return r"\n(?P<skiplines%i>(?:(?:.*\n)*?))" % (nl,)
+ # static replacement from RH4TAG:
+ try:
+ return RH4TAG[tn]
+ except KeyError:
+ pass
+
+ # (begin / end tag) for customizable expressions, additionally used as
+ # user custom tags (match will be stored in ticket data, can be used in actions):
+ m = FCUSTNAME_CRE.match(tn)
+ if m: # match F-...
+ m = m.groups()
+ tn = m[1]
+ # close tag:
+ if m[0]:
+ # check it was already open:
+ if openTags.get(tn):
+ return ")"
+ return tag; # tag not opened, use original
+ # open tag:
+ openTags[tn] = 1
+ # if should be mapped:
+ tn = mapTag2Opt(tn)
+ return "(?P<%s>" % (tn,)
+
+ # original, no replacement:
+ return tag
+
+ # substitute tags:
+ return FTAG_CRE.sub(substTag, regex)
##
# Gets the regular expression.
@@ -121,40 +193,45 @@ class Regex:
# method of this object.
# @param a list of tupples. The tupples are ( prematch, datematch, postdatematch )
- def search(self, tupleLines):
+ def search(self, tupleLines, orgLines=None):
self._matchCache = self._regexObj.search(
"\n".join("".join(value[::2]) for value in tupleLines) + "\n")
- if self.hasMatched():
- # Find start of the first line where the match was found
- try:
- self._matchLineStart = self._matchCache.string.rindex(
- "\n", 0, self._matchCache.start() +1 ) + 1
- except ValueError:
- self._matchLineStart = 0
- # Find end of the last line where the match was found
- try:
- self._matchLineEnd = self._matchCache.string.index(
- "\n", self._matchCache.end() - 1) + 1
- except ValueError:
- self._matchLineEnd = len(self._matchCache.string)
-
- lineCount1 = self._matchCache.string.count(
- "\n", 0, self._matchLineStart)
- lineCount2 = self._matchCache.string.count(
- "\n", 0, self._matchLineEnd)
- self._matchedTupleLines = tupleLines[lineCount1:lineCount2]
- self._unmatchedTupleLines = tupleLines[:lineCount1]
-
- n = 0
- for skippedLine in self.getSkippedLines():
- for m, matchedTupleLine in enumerate(
- self._matchedTupleLines[n:]):
- if "".join(matchedTupleLine[::2]) == skippedLine:
- self._unmatchedTupleLines.append(
- self._matchedTupleLines.pop(n+m))
- n += m
- break
- self._unmatchedTupleLines.extend(tupleLines[lineCount2:])
+ if self._matchCache:
+ if orgLines is None: orgLines = tupleLines
+ # if single-line:
+ if len(orgLines) <= 1:
+ self._matchedTupleLines = orgLines
+ self._unmatchedTupleLines = []
+ else:
+ # Find start of the first line where the match was found
+ try:
+ matchLineStart = self._matchCache.string.rindex(
+ "\n", 0, self._matchCache.start() +1 ) + 1
+ except ValueError:
+ matchLineStart = 0
+ # Find end of the last line where the match was found
+ try:
+ matchLineEnd = self._matchCache.string.index(
+ "\n", self._matchCache.end() - 1) + 1
+ except ValueError:
+ matchLineEnd = len(self._matchCache.string)
+
+ lineCount1 = self._matchCache.string.count(
+ "\n", 0, matchLineStart)
+ lineCount2 = self._matchCache.string.count(
+ "\n", 0, matchLineEnd)
+ self._matchedTupleLines = orgLines[lineCount1:lineCount2]
+ self._unmatchedTupleLines = orgLines[:lineCount1]
+ n = 0
+ for skippedLine in self.getSkippedLines():
+ for m, matchedTupleLine in enumerate(
+ self._matchedTupleLines[n:]):
+ if "".join(matchedTupleLine[::2]) == skippedLine:
+ self._unmatchedTupleLines.append(
+ self._matchedTupleLines.pop(n+m))
+ n += m
+ break
+ self._unmatchedTupleLines.extend(orgLines[lineCount2:])
# Checks if the previous call to search() matched.
#
@@ -167,6 +244,13 @@ class Regex:
return False
##
+ # Returns all matched groups.
+ #
+
+ def getGroups(self):
+ return self._matchCache.groupdict()
+
+ ##
# Returns skipped lines.
#
# This returns skipped lines captured by the <SKIPLINES> tag.
@@ -243,6 +327,10 @@ class RegexException(Exception):
#
FAILURE_ID_GROPS = ("fid", "ip4", "ip6", "dns")
+# Additionally allows multi-line failure-id (used for wrapping e. g. conn-id to host)
+#
+FAILURE_ID_PRESENTS = FAILURE_ID_GROPS + ("mlfid",)
+
##
# Regular expression class.
#
@@ -257,21 +345,17 @@ class FailRegex(Regex):
# avoid construction of invalid object.
# @param value the regular expression
- def __init__(self, regex, **kwargs):
+ def __init__(self, regex, prefRegex=None, **kwargs):
# Initializes the parent.
Regex.__init__(self, regex, **kwargs)
# Check for group "dns", "ip4", "ip6", "fid"
- if not [grp for grp in FAILURE_ID_GROPS if grp in self._regexObj.groupindex]:
+ if (not [grp for grp in FAILURE_ID_PRESENTS if grp in self._regexObj.groupindex]
+ and (prefRegex is None or
+ not [grp for grp in FAILURE_ID_PRESENTS if grp in prefRegex._regexObj.groupindex])
+ ):
raise RegexException("No failure-id group in '%s'" % self._regex)
##
- # Returns all matched groups.
- #
-
- def getGroups(self):
- return self._matchCache.groupdict()
-
- ##
# Returns the matched failure id.
#
# This corresponds to the pattern matched by the named group from given groups.