diff options
author | sebres <info@sebres.de> | 2021-09-20 04:07:55 +0200 |
---|---|---|
committer | sebres <serg.brester@sebres.de> | 2021-11-03 15:41:40 +0100 |
commit | 96661f25ab558b92b42d01c87d383664e69dd18b (patch) | |
tree | 9deb0a09b137c2fd06d77fa2f8b6d159c63d194e /fail2ban | |
parent | 7678f5982733f1d20bbec8d04006ac78cded10db (diff) | |
download | fail2ban-96661f25ab558b92b42d01c87d383664e69dd18b.tar.gz |
filtersystemd.py: fixes wrong time point of "in operation" mode
todo: need more tests to cover any step of switch to inOperationMode (all branches)
Diffstat (limited to 'fail2ban')
-rw-r--r-- | fail2ban/server/filtersystemd.py | 78 | ||||
-rw-r--r-- | fail2ban/tests/filtertestcase.py | 23 |
2 files changed, 82 insertions, 19 deletions
diff --git a/fail2ban/server/filtersystemd.py b/fail2ban/server/filtersystemd.py index 88f8c292..a305b6cf 100644 --- a/fail2ban/server/filtersystemd.py +++ b/fail2ban/server/filtersystemd.py @@ -92,8 +92,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover try: args['flags'] = int(kwargs.pop('journalflags')) except KeyError: - # be sure all journal types will be opened if files specified (don't set flags): - if 'files' not in args or not len(args['files']): + # be sure all journal types will be opened if files/path specified (don't set flags): + if ('files' not in args or not len(args['files'])) and ('path' not in args or not args['path']): args['flags'] = 4 try: @@ -258,6 +258,10 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover date = datetime.datetime.fromtimestamp(date) self.__journal.seek_realtime(date) + def inOperationMode(self): + self.inOperation = True + logSys.info("[%s] Jail is in operation now (process new journal entries)", self.jailName) + ## # Main loop. # @@ -268,23 +272,44 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover if not self.getJournalMatch(): logSys.notice( - "Jail started without 'journalmatch' set. " + "[%s] Jail started without 'journalmatch' set. " "Jail regexs will be checked against all journal entries, " - "which is not advised for performance reasons.") - - # Try to obtain the last known time (position of journal) - start_time = 0 - if self.jail.database is not None: - start_time = self.jail.database.getJournalPos(self.jail, 'systemd-journal') or 0 - # Seek to max(last_known_time, now - findtime) in journal - start_time = max( start_time, MyTime.time() - int(self.getFindTime()) ) - self.seekToTime(start_time) - # Move back one entry to ensure do not end up in dead space - # if start time beyond end of journal + "which is not advised for performance reasons.", self.jailName) + + # Save current cursor position (to recognize in operation mode): + logentry = None try: - self.__journal.get_previous() + self.__journal.seek_tail() + logentry = self.__journal.get_previous() + self.__journal.get_next() except OSError: - pass # Reading failure, so safe to ignore + logentry = None # Reading failure, so safe to ignore + if logentry: + # Try to obtain the last known time (position of journal) + startTime = 0 + if self.jail.database is not None: + startTime = self.jail.database.getJournalPos(self.jail, 'systemd-journal') or 0 + # Seek to max(last_known_time, now - findtime) in journal + startTime = max( startTime, MyTime.time() - int(self.getFindTime()) ) + self.seekToTime(startTime) + # Not in operation while we'll read old messages ... + self.inOperation = False + # Save current time in order to check time to switch "in operation" mode + startTime = (1, MyTime.time(), logentry.get('__CURSOR')) + # Move back one entry to ensure do not end up in dead space + # if start time beyond end of journal + try: + self.__journal.get_previous() + except OSError: + pass # Reading failure, so safe to ignore + else: + # empty journal or no entries for current filter: + self.inOperationMode() + # seek_tail() seems to have a bug by no entries (could bypass some entries hereafter), so seek to now instead: + startTime = MyTime.time() + self.seekToTime(startTime) + # for possible future switches of in-operation mode: + startTime = (0, startTime) line = None while self.active: @@ -317,12 +342,27 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG) self.ticks += 1 if logentry: - line = self.formatJournalEntry(logentry) - self.processLineAndAdd(*line) + line, tm = self.formatJournalEntry(logentry) + # switch "in operation" mode if we'll find start entry (+ some delta): + if not self.inOperation: + if tm >= MyTime.time() - 1: # reached now (approximated): + self.inOperationMode() + elif startTime[0] == 1: + # if it reached start entry (or get read time larger than start time) + if logentry.get('__CURSOR') == startTime[2] or tm > startTime[1]: + # give the filter same time it needed to reach the start entry: + startTime = (0, MyTime.time()*2 - startTime[1]) + elif tm > startTime[1]: # reached start time (approximated): + self.inOperationMode() + # process line + self.processLineAndAdd(line, tm) self.__modified += 1 if self.__modified >= 100: # todo: should be configurable break else: + # "in operation" mode since we don't have messages anymore (reached end of journal): + if not self.inOperation: + self.inOperationMode() break self.__modified = 0 if self.ticks % 10 == 0: @@ -334,7 +374,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover or not self.active ) ): - self.jail.database.updateJournal(self.jail, 'systemd-journal', line[1], line[0][1]) + self.jail.database.updateJournal(self.jail, 'systemd-journal', tm, line[1]) self.__nextUpdateTM = MyTime.time() + Utils.DEFAULT_SLEEP_TIME * 5 line = None except Exception as e: # pragma: no cover diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 343cfe04..32dcca5c 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -1537,6 +1537,29 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover _gen_falure("192.0.2.6") self.assertFalse(self.jail.getFailTicket()) + # now reset DB, so we'd find all messages before filter entering in operation mode: + self.filter.stop() + self.filter.join() + self.jail.database.updateJournal(self.jail, 'systemd-journal', MyTime.time()-10000, 'TEST') + self._initFilter() + self.filter.setMaxRetry(1) + states = [] + def _state(*args): + self.assertNotIn("** in operation", states) + self.assertFalse(self.filter.inOperation) + states.append("** process line: %r" % (args,)) + self.filter.processLineAndAdd = _state + def _inoper(): + self.assertNotIn("** in operation", states) + self.assertEqual(len(states), 11) + states.append("** in operation") + self.filter.__class__.inOperationMode(self.filter) + self.filter.inOperationMode = _inoper + self.filter.start() + self.waitForTicks(12) + self.assertTrue(Utils.wait_for(lambda: len(states) == 12, _maxWaitTime(10))) + self.assertEqual(states[-1], "** in operation") + def test_delJournalMatch(self): self._initFilter() self.filter.start() |