summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYaroslav Halchenko <debian@onerussian.com>2014-05-07 09:09:01 -0400
committerYaroslav Halchenko <debian@onerussian.com>2014-05-07 09:09:01 -0400
commit3471f13a84dd38d63439b61a5bc51b492f4ecf7c (patch)
treeaf7e4b9dae58b23c8f9352e71c53ee3ed6aef453
parent3eabf4a7bd310dab132a4c9d15970c082cfaef22 (diff)
parent6a740f684a52f4ccc310e75d17b18ec610c70bb0 (diff)
downloadfail2ban-3471f13a84dd38d63439b61a5bc51b492f4ecf7c.tar.gz
Merge pull request #700 from kwirk/format-traceback-to-helpers
ENH: Move traceback formatter to from tests.utils to helpers
-rwxr-xr-xbin/fail2ban-regex2
-rwxr-xr-xbin/fail2ban-testcases3
-rw-r--r--fail2ban/helpers.py83
-rw-r--r--fail2ban/tests/misctestcase.py3
-rw-r--r--fail2ban/tests/utils.py84
5 files changed, 91 insertions, 84 deletions
diff --git a/bin/fail2ban-regex b/bin/fail2ban-regex
index 974566bc..3a887867 100755
--- a/bin/fail2ban-regex
+++ b/bin/fail2ban-regex
@@ -45,7 +45,7 @@ from fail2ban.client.filterreader import FilterReader
from fail2ban.server.filter import Filter
from fail2ban.server.failregex import RegexException
-from fail2ban.tests.utils import FormatterWithTraceBack
+from fail2ban.helpers import FormatterWithTraceBack
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban")
diff --git a/bin/fail2ban-testcases b/bin/fail2ban-testcases
index b3bddf1c..0e2fdb4b 100755
--- a/bin/fail2ban-testcases
+++ b/bin/fail2ban-testcases
@@ -34,7 +34,8 @@ if os.path.exists("fail2ban/__init__.py"):
sys.path.insert(0, ".")
from fail2ban.version import version
-from fail2ban.tests.utils import FormatterWithTraceBack, gatherTests
+from fail2ban.tests.utils import gatherTests
+from fail2ban.helpers import FormatterWithTraceBack
from fail2ban.server.mytime import MyTime
from optparse import OptionParser, Option
diff --git a/fail2ban/helpers.py b/fail2ban/helpers.py
index 74ea7a7a..2579381d 100644
--- a/fail2ban/helpers.py
+++ b/fail2ban/helpers.py
@@ -20,9 +20,90 @@
__author__ = "Cyril Jaquier, Arturo 'Buanzo' Busleiman, Yaroslav Halchenko"
__license__ = "GPL"
+import sys
+import os
+import traceback
+import re
+import logging
def formatExceptionInfo():
""" Consistently format exception information """
- import sys
cla, exc = sys.exc_info()[:2]
return (cla.__name__, str(exc))
+
+#
+# Following "traceback" functions are adopted from PyMVPA distributed
+# under MIT/Expat and copyright by PyMVPA developers (i.e. me and
+# Michael). Hereby I re-license derivative work on these pieces under GPL
+# to stay in line with the main Fail2Ban license
+#
+def mbasename(s):
+ """Custom function to include directory name if filename is too common
+
+ Also strip .py at the end
+ """
+ base = os.path.basename(s)
+ if base.endswith('.py'):
+ base = base[:-3]
+ if base in set(['base', '__init__']):
+ base = os.path.basename(os.path.dirname(s)) + '.' + base
+ return base
+
+class TraceBack(object):
+ """Customized traceback to be included in debug messages
+ """
+
+ def __init__(self, compress=False):
+ """Initialize TrackBack metric
+
+ Parameters
+ ----------
+ compress : bool
+ if True then prefix common with previous invocation gets
+ replaced with ...
+ """
+ self.__prev = ""
+ self.__compress = compress
+
+ def __call__(self):
+ ftb = traceback.extract_stack(limit=100)[:-2]
+ entries = [
+ [mbasename(x[0]), os.path.dirname(x[0]), str(x[1])] for x in ftb]
+ entries = [ [e[0], e[2]] for e in entries
+ if not (e[0] in ['unittest', 'logging.__init__']
+ or e[1].endswith('/unittest'))]
+
+ # lets make it more concise
+ entries_out = [entries[0]]
+ for entry in entries[1:]:
+ if entry[0] == entries_out[-1][0]:
+ entries_out[-1][1] += ',%s' % entry[1]
+ else:
+ entries_out.append(entry)
+ sftb = '>'.join(['%s:%s' % (mbasename(x[0]),
+ x[1]) for x in entries_out])
+ if self.__compress:
+ # lets remove part which is common with previous invocation
+ prev_next = sftb
+ common_prefix = os.path.commonprefix((self.__prev, sftb))
+ common_prefix2 = re.sub('>[^>]*$', '', common_prefix)
+
+ if common_prefix2 != "":
+ sftb = '...' + sftb[len(common_prefix2):]
+ self.__prev = prev_next
+
+ return sftb
+
+class FormatterWithTraceBack(logging.Formatter):
+ """Custom formatter which expands %(tb) and %(tbc) with tracebacks
+
+ TODO: might need locking in case of compressed tracebacks
+ """
+ def __init__(self, fmt, *args, **kwargs):
+ logging.Formatter.__init__(self, fmt=fmt, *args, **kwargs)
+ compress = '%(tbc)s' in fmt
+ self._tb = TraceBack(compress=compress)
+
+ def format(self, record):
+ record.tbc = record.tb = self._tb()
+ return logging.Formatter.format(self, record)
diff --git a/fail2ban/tests/misctestcase.py b/fail2ban/tests/misctestcase.py
index 284b684b..ca84eba7 100644
--- a/fail2ban/tests/misctestcase.py
+++ b/fail2ban/tests/misctestcase.py
@@ -32,8 +32,7 @@ import datetime
from glob import glob
from StringIO import StringIO
-from .utils import mbasename, TraceBack, FormatterWithTraceBack
-from ..helpers import formatExceptionInfo
+from ..helpers import formatExceptionInfo, mbasename, TraceBack, FormatterWithTraceBack
from ..server.datetemplate import DatePatternRegex
diff --git a/fail2ban/tests/utils.py b/fail2ban/tests/utils.py
index 85c1d929..7727632e 100644
--- a/fail2ban/tests/utils.py
+++ b/fail2ban/tests/utils.py
@@ -22,90 +22,17 @@ __author__ = "Yaroslav Halchenko"
__copyright__ = "Copyright (c) 2013 Yaroslav Halchenko"
__license__ = "GPL"
-import logging, os, re, traceback, time, unittest
-from os.path import basename, dirname
+import logging
+import os
+import re
+import time
+import unittest
from StringIO import StringIO
from ..server.mytime import MyTime
logSys = logging.getLogger(__name__)
-#
-# Following "traceback" functions are adopted from PyMVPA distributed
-# under MIT/Expat and copyright by PyMVPA developers (i.e. me and
-# Michael). Hereby I re-license derivative work on these pieces under GPL
-# to stay in line with the main Fail2Ban license
-#
-def mbasename(s):
- """Custom function to include directory name if filename is too common
-
- Also strip .py at the end
- """
- base = basename(s)
- if base.endswith('.py'):
- base = base[:-3]
- if base in set(['base', '__init__']):
- base = basename(dirname(s)) + '.' + base
- return base
-
-class TraceBack(object):
- """Customized traceback to be included in debug messages
- """
-
- def __init__(self, compress=False):
- """Initialize TrackBack metric
-
- Parameters
- ----------
- compress : bool
- if True then prefix common with previous invocation gets
- replaced with ...
- """
- self.__prev = ""
- self.__compress = compress
-
- def __call__(self):
- ftb = traceback.extract_stack(limit=100)[:-2]
- entries = [[mbasename(x[0]), dirname(x[0]), str(x[1])] for x in ftb]
- entries = [ [e[0], e[2]] for e in entries
- if not (e[0] in ['unittest', 'logging.__init__']
- or e[1].endswith('/unittest'))]
-
- # lets make it more concise
- entries_out = [entries[0]]
- for entry in entries[1:]:
- if entry[0] == entries_out[-1][0]:
- entries_out[-1][1] += ',%s' % entry[1]
- else:
- entries_out.append(entry)
- sftb = '>'.join(['%s:%s' % (mbasename(x[0]),
- x[1]) for x in entries_out])
- if self.__compress:
- # lets remove part which is common with previous invocation
- prev_next = sftb
- common_prefix = os.path.commonprefix((self.__prev, sftb))
- common_prefix2 = re.sub('>[^>]*$', '', common_prefix)
-
- if common_prefix2 != "":
- sftb = '...' + sftb[len(common_prefix2):]
- self.__prev = prev_next
-
- return sftb
-
-class FormatterWithTraceBack(logging.Formatter):
- """Custom formatter which expands %(tb) and %(tbc) with tracebacks
-
- TODO: might need locking in case of compressed tracebacks
- """
- def __init__(self, fmt, *args, **kwargs):
- logging.Formatter.__init__(self, fmt=fmt, *args, **kwargs)
- compress = '%(tbc)s' in fmt
- self._tb = TraceBack(compress=compress)
-
- def format(self, record):
- record.tbc = record.tb = self._tb()
- return logging.Formatter.format(self, record)
-
def mtimesleep():
# no sleep now should be necessary since polling tracks now not only
# mtime but also ino and size
@@ -146,7 +73,6 @@ def gatherTests(regexps=None, no_network=False):
if not regexps: # pragma: no cover
tests = unittest.TestSuite()
else: # pragma: no cover
- import re
class FilteredTestSuite(unittest.TestSuite):
_regexps = [re.compile(r) for r in regexps]
def addTest(self, suite):