diff options
Diffstat (limited to 'fail2ban/server/failregex.py')
-rw-r--r-- | fail2ban/server/failregex.py | 246 |
1 files changed, 165 insertions, 81 deletions
diff --git a/fail2ban/server/failregex.py b/fail2ban/server/failregex.py index 7c51ddb8..d5c9345f 100644 --- a/fail2ban/server/failregex.py +++ b/fail2ban/server/failregex.py @@ -27,6 +27,68 @@ import sys from .ipdns import IPAddr + +FTAG_CRE = re.compile(r'</?[\w\-]+/?>') + +FCUSTNAME_CRE = re.compile(r'^(/?)F-([A-Z0-9_\-]+)$'); # currently uppercase only + +R_HOST = [ + # separated ipv4: + r"""(?:::f{4,6}:)?(?P<ip4>%s)""" % (IPAddr.IP_4_RE,), + # separated ipv6: + r"""(?P<ip6>%s)""" % (IPAddr.IP_6_RE,), + # place-holder for ipv6 enclosed in optional [] (used in addr-, host-regex) + "", + # separated dns: + r"""(?P<dns>[\w\-.^_]*\w)""", + # place-holder for ADDR tag-replacement (joined): + "", + # place-holder for HOST tag replacement (joined): + "" +] +RI_IPV4 = 0 +RI_IPV6 = 1 +RI_IPV6BR = 2 +RI_DNS = 3 +RI_ADDR = 4 +RI_HOST = 5 + +R_HOST[RI_IPV6BR] = r"""\[?%s\]?""" % (R_HOST[RI_IPV6],) +R_HOST[RI_ADDR] = "(?:%s)" % ("|".join((R_HOST[RI_IPV4], R_HOST[RI_IPV6BR])),) +R_HOST[RI_HOST] = "(?:%s)" % ("|".join((R_HOST[RI_IPV4], R_HOST[RI_IPV6BR], R_HOST[RI_DNS])),) + +RH4TAG = { + # separated ipv4 (self closed, closed): + "IP4": R_HOST[RI_IPV4], + "F-IP4/": R_HOST[RI_IPV4], + # separated ipv6 (self closed, closed): + "IP6": R_HOST[RI_IPV6], + "F-IP6/": R_HOST[RI_IPV6], + # 2 address groups instead of <ADDR> - in opposition to `<HOST>`, + # for separate usage of 2 address groups only (regardless of `usedns`), `ip4` and `ip6` together + "ADDR": R_HOST[RI_ADDR], + "F-ADDR/": R_HOST[RI_ADDR], + # separated dns (self closed, closed): + "DNS": R_HOST[RI_DNS], + "F-DNS/": R_HOST[RI_DNS], + # default failure-id as no space tag: + "F-ID/": r"""(?P<fid>\S+)""", + # default failure port, like 80 or http : + "F-PORT/": r"""(?P<fport>\w+)""", +} + +# default failure groups map for customizable expressions (with different group-id): +R_MAP = { + "ID": "fid", + "PORT": "fport", +} + +def mapTag2Opt(tag): + try: # if should be mapped: + return R_MAP[tag] + except KeyError: + return tag.lower() + ## # Regular expression class. # @@ -41,20 +103,16 @@ class Regex: # avoid construction of invalid object. # @param value the regular expression - def __init__(self, regex, **kwargs): + def __init__(self, regex, multiline=False, **kwargs): self._matchCache = None # Perform shortcuts expansions. - # Resolve "<HOST>" tag using default regular expression for host: + # Replace standard f2b-tags (like "<HOST>", etc) using default regular expressions: regex = Regex._resolveHostTag(regex, **kwargs) - # Replace "<SKIPLINES>" with regular expression for multiple lines. - regexSplit = regex.split("<SKIPLINES>") - regex = regexSplit[0] - for n, regexLine in enumerate(regexSplit[1:]): - regex += "\n(?P<skiplines%i>(?:(.*\n)*?))" % n + regexLine + # if regex.lstrip() == '': raise RegexException("Cannot add empty regex") try: - self._regexObj = re.compile(regex, re.MULTILINE) + self._regexObj = re.compile(regex, re.MULTILINE if multiline else 0) self._regex = regex except sre_constants.error: raise RegexException("Unable to compile regular expression '%s'" % @@ -71,38 +129,52 @@ class Regex: @staticmethod def _resolveHostTag(regex, useDns="yes"): - # separated ipv4: - r_host = [] - r = r"""(?:::f{4,6}:)?(?P<ip4>%s)""" % (IPAddr.IP_4_RE,) - regex = regex.replace("<IP4>", r); # self closed - regex = regex.replace("<F-IP4/>", r); # closed - r_host.append(r) - # separated ipv6: - r = r"""(?P<ip6>%s)""" % (IPAddr.IP_6_RE,) - regex = regex.replace("<IP6>", r); # self closed - regex = regex.replace("<F-IP6/>", r); # closed - r_host.append(r"""\[?%s\]?""" % (r,)); # enclose ipv6 in optional [] in host-regex - # 2 address groups instead of <ADDR> - in opposition to `<HOST>`, - # for separate usage of 2 address groups only (regardless of `usedns`), `ip4` and `ip6` together - regex = regex.replace("<ADDR>", "(?:%s)" % ("|".join(r_host),)) - # separated dns: - r = r"""(?P<dns>[\w\-.^_]*\w)""" - regex = regex.replace("<DNS>", r); # self closed - regex = regex.replace("<F-DNS/>", r); # closed - if useDns not in ("no",): - r_host.append(r) - # 3 groups instead of <HOST> - separated ipv4, ipv6 and host (dns) - regex = regex.replace("<HOST>", "(?:%s)" % ("|".join(r_host),)) - # default failure-id as no space tag: - regex = regex.replace("<F-ID/>", r"""(?P<fid>\S+)"""); # closed - # default failure port, like 80 or http : - regex = regex.replace("<F-PORT/>", r"""(?P<port>\w+)"""); # closed - # default failure groups (begin / end tag) for customizable expressions: - for o,r in (('IP4', 'ip4'), ('IP6', 'ip6'), ('DNS', 'dns'), ('ID', 'fid'), ('PORT', 'fport')): - regex = regex.replace("<F-%s>" % o, "(?P<%s>" % r); # open tag - regex = regex.replace("</F-%s>" % o, ")"); # close tag - - return regex + + openTags = dict() + props = { + 'nl': 0, # new lines counter by <SKIPLINES> tag; + } + # tag interpolation callable: + def substTag(m): + tag = m.group() + tn = tag[1:-1] + # 3 groups instead of <HOST> - separated ipv4, ipv6 and host (dns) + if tn == "HOST": + return R_HOST[RI_HOST if useDns not in ("no",) else RI_ADDR] + # replace "<SKIPLINES>" with regular expression for multiple lines (by buffering with maxlines) + if tn == "SKIPLINES": + nl = props['nl'] + props['nl'] = nl + 1 + return r"\n(?P<skiplines%i>(?:(?:.*\n)*?))" % (nl,) + # static replacement from RH4TAG: + try: + return RH4TAG[tn] + except KeyError: + pass + + # (begin / end tag) for customizable expressions, additionally used as + # user custom tags (match will be stored in ticket data, can be used in actions): + m = FCUSTNAME_CRE.match(tn) + if m: # match F-... + m = m.groups() + tn = m[1] + # close tag: + if m[0]: + # check it was already open: + if openTags.get(tn): + return ")" + return tag; # tag not opened, use original + # open tag: + openTags[tn] = 1 + # if should be mapped: + tn = mapTag2Opt(tn) + return "(?P<%s>" % (tn,) + + # original, no replacement: + return tag + + # substitute tags: + return FTAG_CRE.sub(substTag, regex) ## # Gets the regular expression. @@ -121,40 +193,45 @@ class Regex: # method of this object. # @param a list of tupples. The tupples are ( prematch, datematch, postdatematch ) - def search(self, tupleLines): + def search(self, tupleLines, orgLines=None): self._matchCache = self._regexObj.search( "\n".join("".join(value[::2]) for value in tupleLines) + "\n") - if self.hasMatched(): - # Find start of the first line where the match was found - try: - self._matchLineStart = self._matchCache.string.rindex( - "\n", 0, self._matchCache.start() +1 ) + 1 - except ValueError: - self._matchLineStart = 0 - # Find end of the last line where the match was found - try: - self._matchLineEnd = self._matchCache.string.index( - "\n", self._matchCache.end() - 1) + 1 - except ValueError: - self._matchLineEnd = len(self._matchCache.string) - - lineCount1 = self._matchCache.string.count( - "\n", 0, self._matchLineStart) - lineCount2 = self._matchCache.string.count( - "\n", 0, self._matchLineEnd) - self._matchedTupleLines = tupleLines[lineCount1:lineCount2] - self._unmatchedTupleLines = tupleLines[:lineCount1] - - n = 0 - for skippedLine in self.getSkippedLines(): - for m, matchedTupleLine in enumerate( - self._matchedTupleLines[n:]): - if "".join(matchedTupleLine[::2]) == skippedLine: - self._unmatchedTupleLines.append( - self._matchedTupleLines.pop(n+m)) - n += m - break - self._unmatchedTupleLines.extend(tupleLines[lineCount2:]) + if self._matchCache: + if orgLines is None: orgLines = tupleLines + # if single-line: + if len(orgLines) <= 1: + self._matchedTupleLines = orgLines + self._unmatchedTupleLines = [] + else: + # Find start of the first line where the match was found + try: + matchLineStart = self._matchCache.string.rindex( + "\n", 0, self._matchCache.start() +1 ) + 1 + except ValueError: + matchLineStart = 0 + # Find end of the last line where the match was found + try: + matchLineEnd = self._matchCache.string.index( + "\n", self._matchCache.end() - 1) + 1 + except ValueError: + matchLineEnd = len(self._matchCache.string) + + lineCount1 = self._matchCache.string.count( + "\n", 0, matchLineStart) + lineCount2 = self._matchCache.string.count( + "\n", 0, matchLineEnd) + self._matchedTupleLines = orgLines[lineCount1:lineCount2] + self._unmatchedTupleLines = orgLines[:lineCount1] + n = 0 + for skippedLine in self.getSkippedLines(): + for m, matchedTupleLine in enumerate( + self._matchedTupleLines[n:]): + if "".join(matchedTupleLine[::2]) == skippedLine: + self._unmatchedTupleLines.append( + self._matchedTupleLines.pop(n+m)) + n += m + break + self._unmatchedTupleLines.extend(orgLines[lineCount2:]) # Checks if the previous call to search() matched. # @@ -167,6 +244,13 @@ class Regex: return False ## + # Returns all matched groups. + # + + def getGroups(self): + return self._matchCache.groupdict() + + ## # Returns skipped lines. # # This returns skipped lines captured by the <SKIPLINES> tag. @@ -243,6 +327,10 @@ class RegexException(Exception): # FAILURE_ID_GROPS = ("fid", "ip4", "ip6", "dns") +# Additionally allows multi-line failure-id (used for wrapping e. g. conn-id to host) +# +FAILURE_ID_PRESENTS = FAILURE_ID_GROPS + ("mlfid",) + ## # Regular expression class. # @@ -257,21 +345,17 @@ class FailRegex(Regex): # avoid construction of invalid object. # @param value the regular expression - def __init__(self, regex, **kwargs): + def __init__(self, regex, prefRegex=None, **kwargs): # Initializes the parent. Regex.__init__(self, regex, **kwargs) # Check for group "dns", "ip4", "ip6", "fid" - if not [grp for grp in FAILURE_ID_GROPS if grp in self._regexObj.groupindex]: + if (not [grp for grp in FAILURE_ID_PRESENTS if grp in self._regexObj.groupindex] + and (prefRegex is None or + not [grp for grp in FAILURE_ID_PRESENTS if grp in prefRegex._regexObj.groupindex]) + ): raise RegexException("No failure-id group in '%s'" % self._regex) ## - # Returns all matched groups. - # - - def getGroups(self): - return self._matchCache.groupdict() - - ## # Returns the matched failure id. # # This corresponds to the pattern matched by the named group from given groups. |