summaryrefslogtreecommitdiff
path: root/fail2ban
diff options
context:
space:
mode:
authorsebres <info@sebres.de>2021-09-20 04:07:55 +0200
committersebres <serg.brester@sebres.de>2021-11-03 15:41:40 +0100
commit96661f25ab558b92b42d01c87d383664e69dd18b (patch)
tree9deb0a09b137c2fd06d77fa2f8b6d159c63d194e /fail2ban
parent7678f5982733f1d20bbec8d04006ac78cded10db (diff)
downloadfail2ban-96661f25ab558b92b42d01c87d383664e69dd18b.tar.gz
filtersystemd.py: fixes wrong time point of "in operation" mode
todo: need more tests to cover any step of switch to inOperationMode (all branches)
Diffstat (limited to 'fail2ban')
-rw-r--r--fail2ban/server/filtersystemd.py78
-rw-r--r--fail2ban/tests/filtertestcase.py23
2 files changed, 82 insertions, 19 deletions
diff --git a/fail2ban/server/filtersystemd.py b/fail2ban/server/filtersystemd.py
index 88f8c292..a305b6cf 100644
--- a/fail2ban/server/filtersystemd.py
+++ b/fail2ban/server/filtersystemd.py
@@ -92,8 +92,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
try:
args['flags'] = int(kwargs.pop('journalflags'))
except KeyError:
- # be sure all journal types will be opened if files specified (don't set flags):
- if 'files' not in args or not len(args['files']):
+ # be sure all journal types will be opened if files/path specified (don't set flags):
+ if ('files' not in args or not len(args['files'])) and ('path' not in args or not args['path']):
args['flags'] = 4
try:
@@ -258,6 +258,10 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
date = datetime.datetime.fromtimestamp(date)
self.__journal.seek_realtime(date)
+ def inOperationMode(self):
+ self.inOperation = True
+ logSys.info("[%s] Jail is in operation now (process new journal entries)", self.jailName)
+
##
# Main loop.
#
@@ -268,23 +272,44 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
if not self.getJournalMatch():
logSys.notice(
- "Jail started without 'journalmatch' set. "
+ "[%s] Jail started without 'journalmatch' set. "
"Jail regexs will be checked against all journal entries, "
- "which is not advised for performance reasons.")
-
- # Try to obtain the last known time (position of journal)
- start_time = 0
- if self.jail.database is not None:
- start_time = self.jail.database.getJournalPos(self.jail, 'systemd-journal') or 0
- # Seek to max(last_known_time, now - findtime) in journal
- start_time = max( start_time, MyTime.time() - int(self.getFindTime()) )
- self.seekToTime(start_time)
- # Move back one entry to ensure do not end up in dead space
- # if start time beyond end of journal
+ "which is not advised for performance reasons.", self.jailName)
+
+ # Save current cursor position (to recognize in operation mode):
+ logentry = None
try:
- self.__journal.get_previous()
+ self.__journal.seek_tail()
+ logentry = self.__journal.get_previous()
+ self.__journal.get_next()
except OSError:
- pass # Reading failure, so safe to ignore
+ logentry = None # Reading failure, so safe to ignore
+ if logentry:
+ # Try to obtain the last known time (position of journal)
+ startTime = 0
+ if self.jail.database is not None:
+ startTime = self.jail.database.getJournalPos(self.jail, 'systemd-journal') or 0
+ # Seek to max(last_known_time, now - findtime) in journal
+ startTime = max( startTime, MyTime.time() - int(self.getFindTime()) )
+ self.seekToTime(startTime)
+ # Not in operation while we'll read old messages ...
+ self.inOperation = False
+ # Save current time in order to check time to switch "in operation" mode
+ startTime = (1, MyTime.time(), logentry.get('__CURSOR'))
+ # Move back one entry to ensure do not end up in dead space
+ # if start time beyond end of journal
+ try:
+ self.__journal.get_previous()
+ except OSError:
+ pass # Reading failure, so safe to ignore
+ else:
+ # empty journal or no entries for current filter:
+ self.inOperationMode()
+ # seek_tail() seems to have a bug by no entries (could bypass some entries hereafter), so seek to now instead:
+ startTime = MyTime.time()
+ self.seekToTime(startTime)
+ # for possible future switches of in-operation mode:
+ startTime = (0, startTime)
line = None
while self.active:
@@ -317,12 +342,27 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG)
self.ticks += 1
if logentry:
- line = self.formatJournalEntry(logentry)
- self.processLineAndAdd(*line)
+ line, tm = self.formatJournalEntry(logentry)
+ # switch "in operation" mode if we'll find start entry (+ some delta):
+ if not self.inOperation:
+ if tm >= MyTime.time() - 1: # reached now (approximated):
+ self.inOperationMode()
+ elif startTime[0] == 1:
+ # if it reached start entry (or get read time larger than start time)
+ if logentry.get('__CURSOR') == startTime[2] or tm > startTime[1]:
+ # give the filter same time it needed to reach the start entry:
+ startTime = (0, MyTime.time()*2 - startTime[1])
+ elif tm > startTime[1]: # reached start time (approximated):
+ self.inOperationMode()
+ # process line
+ self.processLineAndAdd(line, tm)
self.__modified += 1
if self.__modified >= 100: # todo: should be configurable
break
else:
+ # "in operation" mode since we don't have messages anymore (reached end of journal):
+ if not self.inOperation:
+ self.inOperationMode()
break
self.__modified = 0
if self.ticks % 10 == 0:
@@ -334,7 +374,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
or not self.active
)
):
- self.jail.database.updateJournal(self.jail, 'systemd-journal', line[1], line[0][1])
+ self.jail.database.updateJournal(self.jail, 'systemd-journal', tm, line[1])
self.__nextUpdateTM = MyTime.time() + Utils.DEFAULT_SLEEP_TIME * 5
line = None
except Exception as e: # pragma: no cover
diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py
index 343cfe04..32dcca5c 100644
--- a/fail2ban/tests/filtertestcase.py
+++ b/fail2ban/tests/filtertestcase.py
@@ -1537,6 +1537,29 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
_gen_falure("192.0.2.6")
self.assertFalse(self.jail.getFailTicket())
+ # now reset DB, so we'd find all messages before filter entering in operation mode:
+ self.filter.stop()
+ self.filter.join()
+ self.jail.database.updateJournal(self.jail, 'systemd-journal', MyTime.time()-10000, 'TEST')
+ self._initFilter()
+ self.filter.setMaxRetry(1)
+ states = []
+ def _state(*args):
+ self.assertNotIn("** in operation", states)
+ self.assertFalse(self.filter.inOperation)
+ states.append("** process line: %r" % (args,))
+ self.filter.processLineAndAdd = _state
+ def _inoper():
+ self.assertNotIn("** in operation", states)
+ self.assertEqual(len(states), 11)
+ states.append("** in operation")
+ self.filter.__class__.inOperationMode(self.filter)
+ self.filter.inOperationMode = _inoper
+ self.filter.start()
+ self.waitForTicks(12)
+ self.assertTrue(Utils.wait_for(lambda: len(states) == 12, _maxWaitTime(10)))
+ self.assertEqual(states[-1], "** in operation")
+
def test_delJournalMatch(self):
self._initFilter()
self.filter.start()