summaryrefslogtreecommitdiff
path: root/fail2ban
diff options
context:
space:
mode:
authorsebres <serg.brester@sebres.de>2022-11-02 21:06:46 +0100
committersebres <serg.brester@sebres.de>2022-11-02 21:06:46 +0100
commit6d19d2e80013efddbb549c33e305f1cd45736d52 (patch)
treec2ca8701de02537af1ef4777b369956dc9498486 /fail2ban
parent94dac78afebc2a4a676727527d99e70d970b061a (diff)
parent04c252c34b1a26f3da58fecfa02e5862d46df07d (diff)
downloadfail2ban-6d19d2e80013efddbb549c33e305f1cd45736d52.tar.gz
Merge branch '0.10' into 0.11
Diffstat (limited to 'fail2ban')
-rw-r--r--fail2ban/server/filtersystemd.py53
1 files changed, 40 insertions, 13 deletions
diff --git a/fail2ban/server/filtersystemd.py b/fail2ban/server/filtersystemd.py
index bf6885ef..a83b7a13 100644
--- a/fail2ban/server/filtersystemd.py
+++ b/fail2ban/server/filtersystemd.py
@@ -312,20 +312,37 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
except OSError:
pass # Reading failure, so safe to ignore
+ wcode = journal.NOP
line = None
while self.active:
# wait for records (or for timeout in sleeptime seconds):
try:
- ## todo: find better method as wait_for to break (e.g. notify) journal.wait(self.sleeptime),
- ## don't use `journal.close()` for it, because in some python/systemd implementation it may
- ## cause abnormal program termination
- #self.__journal.wait(self.sleeptime) != journal.NOP
- ##
- ## wait for entries without sleep in intervals, because "sleeping" in journal.wait:
- if not logentry:
- Utils.wait_for(lambda: not self.active or \
- self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.NOP,
+ ## wait for entries using journal.wait:
+ if wcode == journal.NOP and self.inOperation:
+ ## todo: find better method as wait_for to break (e.g. notify) journal.wait(self.sleeptime),
+ ## don't use `journal.close()` for it, because in some python/systemd implementation it may
+ ## cause abnormal program termination (e. g. segfault)
+ ##
+ ## wait for entries without sleep in intervals, because "sleeping" in journal.wait,
+ ## journal.NOP is 0, so we can wait for non zero (APPEND or INVALIDATE):
+ wcode = Utils.wait_for(lambda: not self.active and journal.APPEND or \
+ self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL),
self.sleeptime, 0.00001)
+ ## if invalidate (due to rotation, vacuuming or journal files added/removed etc):
+ if self.active and wcode == journal.INVALIDATE:
+ if self.ticks:
+ logSys.log(logging.DEBUG, "[%s] Invalidate signaled, take a little break (rotation ends)", self.jailName)
+ time.sleep(self.sleeptime * 0.25)
+ Utils.wait_for(lambda: not self.active or \
+ self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.INVALIDATE,
+ self.sleeptime * 3, 0.00001)
+ if self.ticks:
+ # move back and forth to ensure do not end up in dead space by rotation or vacuuming,
+ # if position beyond end of journal (gh-3396)
+ try:
+ if self.__journal.get_previous(): self.__journal.get_next()
+ except OSError:
+ pass
if self.idle:
# because journal.wait will returns immediatelly if we have records in journal,
# just wait a little bit here for not idle, to prevent hi-load:
@@ -360,11 +377,13 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
self.processLineAndAdd(line, tm)
self.__modified += 1
if self.__modified >= 100: # todo: should be configurable
+ wcode = journal.APPEND; # don't need wait - there are still unprocessed entries
break
else:
# "in operation" mode since we don't have messages anymore (reached end of journal):
if not self.inOperation:
self.inOperationMode()
+ wcode = journal.NOP; # enter wait - no more entries to process
break
self.__modified = 0
if self.ticks % 10 == 0:
@@ -384,6 +403,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
except Exception as e: # pragma: no cover
if not self.active: # if not active - error by stop...
break
+ wcode = journal.NOP
logSys.error("Caught unhandled exception in main cycle: %r", e,
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
# incr common error counter:
@@ -392,15 +412,20 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
logSys.debug("[%s] filter terminated", self.jailName)
# close journal:
+ self.closeJournal()
+
+ logSys.debug("[%s] filter exited (systemd)", self.jailName)
+ return True
+
+ def closeJournal(self):
try:
- if self.__journal:
- self.__journal.close()
+ jnl, self.__journal = self.__journal, None
+ if jnl:
+ jnl.close()
except Exception as e: # pragma: no cover
logSys.error("Close journal failed: %r", e,
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
- logSys.debug("[%s] filter exited (systemd)", self.jailName)
- return True
def status(self, flavor="basic"):
ret = super(FilterSystemd, self).status(flavor=flavor)
@@ -422,6 +447,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
def onStop(self):
"""Stop monitoring of journal. Invoked after run method.
"""
+ # close journal:
+ self.closeJournal()
# ensure positions of pending logs are up-to-date:
if self._pendDBUpdates and self.jail.database:
self._updateDBPending()