summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsebres <info@sebres.de>2023-04-26 17:10:39 +0200
committersebres <info@sebres.de>2023-04-26 17:10:39 +0200
commitca4af85cd7743f8989fab7b1890d5cb73c61bde0 (patch)
tree5506872bfcbfff845c7466fb75e9ac1de5ac6f63
parentde0ed85fb88e8019efc3b754c0e75723c3f2878e (diff)
downloadfail2ban-ca4af85cd7743f8989fab7b1890d5cb73c61bde0.tar.gz
avoid confusion of path as failure ID with IP/CIDR notation, improve IP/CIDR parsing;
wrong CIDR notation or invalid plen always causes a fallback to raw string now; fixes recognition of `::` and `::/32`
-rw-r--r--fail2ban/server/filter.py2
-rw-r--r--fail2ban/server/ipdns.py22
-rw-r--r--fail2ban/tests/fail2banregextestcase.py7
-rw-r--r--fail2ban/tests/filtertestcase.py32
4 files changed, 53 insertions, 10 deletions
diff --git a/fail2ban/server/filter.py b/fail2ban/server/filter.py
index 68968284..7c1c0df1 100644
--- a/fail2ban/server/filter.py
+++ b/fail2ban/server/filter.py
@@ -944,7 +944,7 @@ class Filter(JailThread):
ip = fid
raw = True
# if mlfid case (not failure):
- if ip is None:
+ if fid is None and ip is None:
if ll <= 7: logSys.log(7, "No failure-id by mlfid %r in regex %s: %s",
mlfid, failRegexIndex, fail.get('mlfforget', "waiting for identifier"))
fail['mlfpending'] = 1; # mark failure is pending
diff --git a/fail2ban/server/ipdns.py b/fail2ban/server/ipdns.py
index b435c6df..6938d37d 100644
--- a/fail2ban/server/ipdns.py
+++ b/fail2ban/server/ipdns.py
@@ -304,9 +304,11 @@ class IPAddr(object):
"""
IP_4_RE = r"""(?:\d{1,3}\.){3}\d{1,3}"""
- IP_6_RE = r"""(?:[0-9a-fA-F]{1,4}::?|::){1,7}(?:[0-9a-fA-F]{1,4}|(?<=:):)"""
+ IP_6_RE = r"""(?:[0-9a-fA-F]{1,4}::?|:){1,7}(?:[0-9a-fA-F]{1,4}|(?<=:):)"""
IP_4_6_CRE = re.compile(
r"""^(?:(?P<IPv4>%s)|\[?(?P<IPv6>%s)\]?)$""" % (IP_4_RE, IP_6_RE))
+ IP_W_CIDR_CRE = re.compile(
+ r"""^(%s|%s)/(?:(\d+)|(%s|%s))$""" % (IP_4_RE, IP_6_RE, IP_4_RE, IP_6_RE))
# An IPv4 compatible IPv6 to be reused (see below)
IP6_4COMPAT = None
@@ -360,13 +362,17 @@ class IPAddr(object):
# test mask:
if "/" not in ipstr:
return ipstr, IPAddr.CIDR_UNSPEC
- s = ipstr.split('/', 1)
- # IP address without CIDR mask
- if len(s) > 2:
- raise ValueError("invalid ipstr %r, too many plen representation" % (ipstr,))
- if "." in s[1] or ":" in s[1]: # 255.255.255.0 resp. ffff:: style mask
- s[1] = IPAddr.masktoplen(s[1])
- s[1] = long(s[1])
+ s = IPAddr.IP_W_CIDR_CRE.match(ipstr)
+ if s is None:
+ return ipstr, IPAddr.CIDR_UNSPEC
+ s = list(s.groups())
+ if s[2]: # 255.255.255.0 resp. ffff:: style mask
+ s[1] = IPAddr.masktoplen(s[2])
+ del s[2]
+ try:
+ s[1] = long(s[1])
+ except ValueError:
+ return ipstr, IPAddr.CIDR_UNSPEC
return s
def __init(self, ipstr, cidr=CIDR_UNSPEC):
diff --git a/fail2ban/tests/fail2banregextestcase.py b/fail2ban/tests/fail2banregextestcase.py
index 4b11ef9a..fe45bf4b 100644
--- a/fail2ban/tests/fail2banregextestcase.py
+++ b/fail2ban/tests/fail2banregextestcase.py
@@ -225,6 +225,13 @@ class Fail2banRegexTest(LogCaptureTestCase):
))
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed', all=True)
self.assertNotLogged('Unable to find a corresponding IP address')
+ # no confusion to IP/CIDR
+ self.pruneLog()
+ self.assertTrue(_test_exec(
+ "-d", "^Epoch", "-o", "id",
+ "1490349000 test this/is/some/path/32", "^\s*test <F-ID>\S+</F-ID>"
+ ))
+ self.assertLogged('this/is/some/path/32', all=True)
def testDirectRE_2(self):
self.assertTrue(_test_exec(
diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py
index 9fea84af..a4d542ce 100644
--- a/fail2ban/tests/filtertestcase.py
+++ b/fail2ban/tests/filtertestcase.py
@@ -2105,6 +2105,20 @@ class DNSUtilsNetworkTests(unittest.TestCase):
self.assertTrue(ip6.isSingle)
self.assertTrue(asip('192.0.2.1').isIPv4)
self.assertTrue(id(asip(ip4)) == id(ip4))
+ # ::
+ ip6 = IPAddr('::')
+ self.assertTrue(ip6.isIPv6)
+ self.assertTrue(ip6.isSingle)
+ # ::/32
+ ip6 = IPAddr('::/32')
+ self.assertTrue(ip6.isIPv6)
+ self.assertFalse(ip6.isSingle)
+ # path as ID, conversion as unspecified, fallback to raw (cover confusion with the /CIDR):
+ for s in ('some/path/as/id', 'other-path/24', '1.2.3.4/path'):
+ r = IPAddr(s, IPAddr.CIDR_UNSPEC)
+ self.assertEqual(r.raw, s)
+ self.assertFalse(r.isIPv4)
+ self.assertFalse(r.isIPv6)
def test_IPAddr_Raw(self):
# raw string:
@@ -2243,6 +2257,18 @@ class DNSUtilsNetworkTests(unittest.TestCase):
self.assertFalse(IPAddr('2606:2800:220:1:248:1893:25c8:1', IPAddr.CIDR_RAW).isInNet(ip6net))
# invalid not in net:
self.assertFalse(IPAddr('xxx').isInNet(ip4net))
+ # different forms in ::/32:
+ ip6net = IPAddr('::/32')
+ self.assertTrue(IPAddr('::').isInNet(ip6net))
+ self.assertTrue(IPAddr('::1').isInNet(ip6net))
+ self.assertTrue(IPAddr('0000::').isInNet(ip6net))
+ self.assertTrue(IPAddr('0000::0000').isInNet(ip6net))
+ self.assertTrue(IPAddr('0000:0000:7777::').isInNet(ip6net))
+ self.assertTrue(IPAddr('0000::7777:7777:7777:7777:7777:7777').isInNet(ip6net))
+ self.assertTrue(IPAddr('0000:0000:ffff::').isInNet(ip6net))
+ self.assertTrue(IPAddr('0000::ffff:ffff:ffff:ffff:ffff:ffff').isInNet(ip6net))
+ self.assertFalse(IPAddr('0000:0001:ffff::').isInNet(ip6net))
+ self.assertFalse(IPAddr('1::').isInNet(ip6net))
def testIPAddr_Compare(self):
ip4 = [
@@ -2315,7 +2341,11 @@ class DNSUtilsNetworkTests(unittest.TestCase):
def testIPAddr_CIDR_Wrong(self):
# too many plen representations:
- self.assertRaises(ValueError, IPAddr, '2606:28ff:220:1:248:1893:25c8::/ffff::/::1')
+ s = '2606:28ff:220:1:248:1893:25c8::/ffff::/::1'
+ r = IPAddr(s)
+ self.assertEqual(r.raw, s)
+ self.assertFalse(r.isIPv4)
+ self.assertFalse(r.isIPv6)
def testIPAddr_CIDR_Repr(self):
self.assertEqual(["127.0.0.0/8", "::/32", "2001:db8::/32"],