From 04c252c34b1a26f3da58fecfa02e5862d46df07d Mon Sep 17 00:00:00 2001 From: sebres Date: Wed, 2 Nov 2022 21:05:18 +0100 Subject: filtersystemd: code review, wait only if it is necessary - in operational mode and if no more entries retrieved (end of journal); attempt to fix gh-3396 - ensure we give enough time after journal.wait returns with INVALIDATE (due to rotation, vacuuming or journal files added/removed etc) and move cursor back and forth to avoid entering dead space --- fail2ban/server/filtersystemd.py | 53 ++++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 13 deletions(-) (limited to 'fail2ban') diff --git a/fail2ban/server/filtersystemd.py b/fail2ban/server/filtersystemd.py index bf6885ef..a83b7a13 100644 --- a/fail2ban/server/filtersystemd.py +++ b/fail2ban/server/filtersystemd.py @@ -312,20 +312,37 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover except OSError: pass # Reading failure, so safe to ignore + wcode = journal.NOP line = None while self.active: # wait for records (or for timeout in sleeptime seconds): try: - ## todo: find better method as wait_for to break (e.g. notify) journal.wait(self.sleeptime), - ## don't use `journal.close()` for it, because in some python/systemd implementation it may - ## cause abnormal program termination - #self.__journal.wait(self.sleeptime) != journal.NOP - ## - ## wait for entries without sleep in intervals, because "sleeping" in journal.wait: - if not logentry: - Utils.wait_for(lambda: not self.active or \ - self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.NOP, + ## wait for entries using journal.wait: + if wcode == journal.NOP and self.inOperation: + ## todo: find better method as wait_for to break (e.g. notify) journal.wait(self.sleeptime), + ## don't use `journal.close()` for it, because in some python/systemd implementation it may + ## cause abnormal program termination (e. g. segfault) + ## + ## wait for entries without sleep in intervals, because "sleeping" in journal.wait, + ## journal.NOP is 0, so we can wait for non zero (APPEND or INVALIDATE): + wcode = Utils.wait_for(lambda: not self.active and journal.APPEND or \ + self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL), self.sleeptime, 0.00001) + ## if invalidate (due to rotation, vacuuming or journal files added/removed etc): + if self.active and wcode == journal.INVALIDATE: + if self.ticks: + logSys.log(logging.DEBUG, "[%s] Invalidate signaled, take a little break (rotation ends)", self.jailName) + time.sleep(self.sleeptime * 0.25) + Utils.wait_for(lambda: not self.active or \ + self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.INVALIDATE, + self.sleeptime * 3, 0.00001) + if self.ticks: + # move back and forth to ensure do not end up in dead space by rotation or vacuuming, + # if position beyond end of journal (gh-3396) + try: + if self.__journal.get_previous(): self.__journal.get_next() + except OSError: + pass if self.idle: # because journal.wait will returns immediatelly if we have records in journal, # just wait a little bit here for not idle, to prevent hi-load: @@ -360,11 +377,13 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover self.processLineAndAdd(line, tm) self.__modified += 1 if self.__modified >= 100: # todo: should be configurable + wcode = journal.APPEND; # don't need wait - there are still unprocessed entries break else: # "in operation" mode since we don't have messages anymore (reached end of journal): if not self.inOperation: self.inOperationMode() + wcode = journal.NOP; # enter wait - no more entries to process break self.__modified = 0 if self.ticks % 10 == 0: @@ -384,6 +403,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover except Exception as e: # pragma: no cover if not self.active: # if not active - error by stop... break + wcode = journal.NOP logSys.error("Caught unhandled exception in main cycle: %r", e, exc_info=logSys.getEffectiveLevel()<=logging.DEBUG) # incr common error counter: @@ -392,15 +412,20 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover logSys.debug("[%s] filter terminated", self.jailName) # close journal: + self.closeJournal() + + logSys.debug("[%s] filter exited (systemd)", self.jailName) + return True + + def closeJournal(self): try: - if self.__journal: - self.__journal.close() + jnl, self.__journal = self.__journal, None + if jnl: + jnl.close() except Exception as e: # pragma: no cover logSys.error("Close journal failed: %r", e, exc_info=logSys.getEffectiveLevel()<=logging.DEBUG) - logSys.debug("[%s] filter exited (systemd)", self.jailName) - return True def status(self, flavor="basic"): ret = super(FilterSystemd, self).status(flavor=flavor) @@ -422,6 +447,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover def onStop(self): """Stop monitoring of journal. Invoked after run method. """ + # close journal: + self.closeJournal() # ensure positions of pending logs are up-to-date: if self._pendDBUpdates and self.jail.database: self._updateDBPending() -- cgit v1.2.1