summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/test/profiling.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/test/profiling.py')
-rw-r--r--lib/sqlalchemy/test/profiling.py221
1 files changed, 0 insertions, 221 deletions
diff --git a/lib/sqlalchemy/test/profiling.py b/lib/sqlalchemy/test/profiling.py
deleted file mode 100644
index 6f839897d..000000000
--- a/lib/sqlalchemy/test/profiling.py
+++ /dev/null
@@ -1,221 +0,0 @@
-"""Profiling support for unit and performance tests.
-
-These are special purpose profiling methods which operate
-in a more fine-grained way than nose's profiling plugin.
-
-"""
-
-import os, sys
-from sqlalchemy.test.util import function_named, gc_collect
-from nose import SkipTest
-
-__all__ = 'profiled', 'function_call_count', 'conditional_call_count'
-
-all_targets = set()
-profile_config = { 'targets': set(),
- 'report': True,
- 'sort': ('time', 'calls'),
- 'limit': None }
-profiler = None
-
-def profiled(target=None, **target_opts):
- """Optional function profiling.
-
- @profiled('label')
- or
- @profiled('label', report=True, sort=('calls',), limit=20)
-
- Enables profiling for a function when 'label' is targetted for
- profiling. Report options can be supplied, and override the global
- configuration and command-line options.
- """
-
- # manual or automatic namespacing by module would remove conflict issues
- if target is None:
- target = 'anonymous_target'
- elif target in all_targets:
- print "Warning: redefining profile target '%s'" % target
- all_targets.add(target)
-
- filename = "%s.prof" % target
-
- def decorator(fn):
- def profiled(*args, **kw):
- if (target not in profile_config['targets'] and
- not target_opts.get('always', None)):
- return fn(*args, **kw)
-
- elapsed, load_stats, result = _profile(
- filename, fn, *args, **kw)
-
- report = target_opts.get('report', profile_config['report'])
- if report:
- sort_ = target_opts.get('sort', profile_config['sort'])
- limit = target_opts.get('limit', profile_config['limit'])
- print "Profile report for target '%s' (%s)" % (
- target, filename)
-
- stats = load_stats()
- stats.sort_stats(*sort_)
- if limit:
- stats.print_stats(limit)
- else:
- stats.print_stats()
- #stats.print_callers()
- os.unlink(filename)
- return result
- return function_named(profiled, fn.__name__)
- return decorator
-
-def function_call_count(count=None, versions={}, variance=0.05):
- """Assert a target for a test case's function call count.
-
- count
- Optional, general target function call count.
-
- versions
- Optional, a dictionary of Python version strings to counts,
- for example::
-
- { '2.5.1': 110,
- '2.5': 100,
- '2.4': 150 }
-
- The best match for the current running python will be used.
- If none match, 'count' will be used as the fallback.
-
- variance
- An +/- deviation percentage, defaults to 5%.
- """
-
- # this could easily dump the profile report if --verbose is in effect
-
- version_info = list(sys.version_info)
- py_version = '.'.join([str(v) for v in sys.version_info])
- try:
- from sqlalchemy.cprocessors import to_float
- cextension = True
- except ImportError:
- cextension = False
-
- while version_info:
- version = '.'.join([str(v) for v in version_info])
- if cextension:
- version += "+cextension"
- if version in versions:
- count = versions[version]
- break
- version_info.pop()
-
- if count is None:
- return lambda fn: fn
-
- def decorator(fn):
- def counted(*args, **kw):
- try:
- filename = "%s.prof" % fn.__name__
-
- elapsed, stat_loader, result = _profile(
- filename, fn, *args, **kw)
-
- stats = stat_loader()
- calls = stats.total_calls
-
- stats.sort_stats('calls', 'cumulative')
- stats.print_stats()
- #stats.print_callers()
- deviance = int(count * variance)
- if (calls < (count - deviance) or
- calls > (count + deviance)):
- raise AssertionError(
- "Function call count %s not within %s%% "
- "of expected %s. (Python version %s)" % (
- calls, (variance * 100), count, py_version))
-
- return result
- finally:
- if os.path.exists(filename):
- os.unlink(filename)
- return function_named(counted, fn.__name__)
- return decorator
-
-def conditional_call_count(discriminator, categories):
- """Apply a function call count conditionally at runtime.
-
- Takes two arguments, a callable that returns a key value, and a dict
- mapping key values to a tuple of arguments to function_call_count.
-
- The callable is not evaluated until the decorated function is actually
- invoked. If the `discriminator` returns a key not present in the
- `categories` dictionary, no call count assertion is applied.
-
- Useful for integration tests, where running a named test in isolation may
- have a function count penalty not seen in the full suite, due to lazy
- initialization in the DB-API, SA, etc.
- """
-
- def decorator(fn):
- def at_runtime(*args, **kw):
- criteria = categories.get(discriminator(), None)
- if criteria is None:
- return fn(*args, **kw)
-
- rewrapped = function_call_count(*criteria)(fn)
- return rewrapped(*args, **kw)
- return function_named(at_runtime, fn.__name__)
- return decorator
-
-
-def _profile(filename, fn, *args, **kw):
- global profiler
- if not profiler:
- if sys.version_info > (2, 5):
- try:
- import cProfile
- profiler = 'cProfile'
- except ImportError:
- pass
- if not profiler:
- try:
- import hotshot
- profiler = 'hotshot'
- except ImportError:
- profiler = 'skip'
-
- if profiler == 'skip':
- raise SkipTest('Profiling not supported on this platform')
- elif profiler == 'cProfile':
- return _profile_cProfile(filename, fn, *args, **kw)
- else:
- return _profile_hotshot(filename, fn, *args, **kw)
-
-def _profile_cProfile(filename, fn, *args, **kw):
- import cProfile, gc, pstats, time
-
- load_stats = lambda: pstats.Stats(filename)
- gc_collect()
-
- began = time.time()
- cProfile.runctx('result = fn(*args, **kw)', globals(), locals(),
- filename=filename)
- ended = time.time()
-
- return ended - began, load_stats, locals()['result']
-
-def _profile_hotshot(filename, fn, *args, **kw):
- import gc, hotshot, hotshot.stats, time
- load_stats = lambda: hotshot.stats.load(filename)
-
- gc_collect()
- prof = hotshot.Profile(filename)
- began = time.time()
- prof.start()
- try:
- result = fn(*args, **kw)
- finally:
- prof.stop()
- ended = time.time()
- prof.close()
-
- return ended - began, load_stats, result
-