summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGustavo Niemeyer <gustavo@niemeyer.net>2007-10-14 21:49:22 -0300
committerGustavo Niemeyer <gustavo@niemeyer.net>2007-10-14 21:49:22 -0300
commit672b6a5b727c10a32c5274aee4e11a639f5ae27f (patch)
tree5a1e41ed8f1cc6f6941fb62057580dc41b102ba3
downloadmocker-672b6a5b727c10a32c5274aee4e11a639f5ae27f.tar.gz
Adding initial version of python-mocker to repository! It's alive!
-rw-r--r--mocker.py895
-rwxr-xr-xtest.py1841
2 files changed, 2736 insertions, 0 deletions
diff --git a/mocker.py b/mocker.py
new file mode 100644
index 0000000..1946dd9
--- /dev/null
+++ b/mocker.py
@@ -0,0 +1,895 @@
+import inspect
+import sys
+import gc
+
+
+__all__ = ["Mocker", "expect", "SAME", "CONTAINS", "ANY", "VARIOUS"]
+
+
+# --------------------------------------------------------------------
+# Exceptions
+
+class UnexpectedExprError(AssertionError):
+ """Raised when an unknown expression is seen in playback mode."""
+
+
+# --------------------------------------------------------------------
+# Helper for chained-style calling.
+
+class expect(object):
+ """This is a simple helper that allows a different call-style.
+
+ With this class one can comfortably do chaining of calls to the
+ mocker object responsible by the object being handler. For instance:
+
+ expect(obj.attr).result(3).count(1, 2)
+
+ Is the same as:
+
+ obj.attr
+ mocker.result(3)
+ mocker.count(1, 2)
+
+ """
+
+ def __init__(self, mock, attr=None):
+ self._mock = mock
+ self._attr = attr
+
+ def __getattr__(self, attr):
+ return self.__class__(self._mock, attr)
+
+ def __call__(self, *args, **kwargs):
+ getattr(self._mock.__mocker__, self._attr)(*args, **kwargs)
+ return self
+
+
+# --------------------------------------------------------------------
+# Mocker.
+
+class classinstancemethod(object):
+
+ def __init__(self, method):
+ self.method = method
+
+ def __get__(self, obj, cls=None):
+ def bound_method(*args, **kwargs):
+ return self.method(cls, obj, *args, **kwargs)
+ return bound_method
+
+
+class State(object):
+
+ def __init__(self, name):
+ self._name = name
+
+ def __repr__(self):
+ return self._name
+
+
+RECORD = State("RECORD")
+REPLAY = State("REPLAY")
+RESTORE = State("RESTORE")
+
+
+class MockerBase(object):
+ """This is the implementation of Mocker, without any recorders."""
+
+ _recorders = []
+
+ # For convenience only.
+ on = expect
+
+ class __metaclass__(type):
+ def __init__(self, name, bases, dict):
+ # Make independent lists on each subclass, inheriting from parent.
+ self._recorders = list(getattr(self, "_recorders", ()))
+
+ def __init__(self):
+ self._recorders = self._recorders[:]
+ self._events = []
+ self._state = RECORD
+ self._ordering = False
+ self._last_orderer = None
+
+ def get_state(self):
+ return self._state
+
+ def _set_state(self, state):
+ if state is not self._state:
+ self._state = state
+ for event in self._events:
+ event.set_state(state)
+
+ def replay(self):
+ self._set_state(REPLAY)
+
+ def record(self):
+ self._set_state(RECORD)
+
+ def restore(self):
+ self._set_state(RESTORE)
+
+ def is_ordering(self):
+ return self._ordering
+
+ def ordered(self):
+ self._ordering = True
+ return OrderedContext(self)
+
+ def unordered(self):
+ self._ordering = False
+ self._last_orderer = None
+
+ def get_events(self):
+ return self._events[:]
+
+ def add_event(self, event):
+ self._events.append(event)
+ if self._ordering:
+ orderer = event.add_task(Orderer())
+ if self._last_orderer:
+ orderer.add_dependency(self._last_orderer)
+ self._last_orderer = orderer
+ return event
+
+ def verify(self):
+ for event in self._events:
+ event.verify()
+
+ def obj(self, spec=None):
+ """Return a new mock object."""
+ return Mock(self, spec=spec)
+
+ def proxy(self, object, spec=None, name=None,
+ passthrough=True, install=False):
+ """Return a new mock object which proxies to the given object.
+
+ Unknown expressions are passed through to the proxied object
+ by default, unless passthrough is set to False. When set to
+ False, pass through only happens when explicitly requested.
+ """
+ mock = Mock(self, spec=spec, object=object,
+ name=name, passthrough=passthrough)
+ if install:
+ event = self.add_event(Event())
+ event.add_task(ProxyInstaller(mock))
+ return mock
+
+ def module(self, name, passthrough=True):
+ return self.proxy(__import__(name, {}, {}, [""]),
+ name=name, passthrough=passthrough, install=True)
+
+ def act(self, path):
+ """This is called by mock objects whenever something happens to them.
+ """
+ if self._state is RECORD:
+ event = self.add_event(Event(path))
+ for recorder in self._recorders:
+ recorder(self, event)
+ return Mock(self, path)
+ elif self._state is REPLAY:
+ for event in sorted(self._events, key=lambda e:e.satisfied()):
+ if event.matches(path):
+ return event.run(path)
+ raise UnexpectedExprError("[Mocker] Unexpected expression: %s"
+ % path)
+ # XXX Show the full state here.
+ else:
+ raise RuntimeError("Mocker isn't recording or replaying.")
+
+ @classinstancemethod
+ def get_recorders(cls, self):
+ return (self or cls)._recorders[:]
+
+ @classinstancemethod
+ def add_recorder(cls, self, recorder):
+ (self or cls)._recorders.append(recorder)
+ return recorder
+
+ @classinstancemethod
+ def remove_recorder(cls, self, recorder):
+ (self or cls)._recorders.remove(recorder)
+
+ def result(self, value):
+ """Last recorded event will return the given value."""
+ self.call(lambda *args, **kwargs: value)
+
+ def throw(self, exception):
+ """Last recorded event will raise the given exception."""
+ def raise_exception(*args, **kwargs):
+ raise exception
+ self.call(raise_exception)
+
+ def call(self, func):
+ """Last recorded event will cause the given function to be called.
+
+ The result of the function will be used as the event result.
+ """
+ self._events[-1].add_task(FunctionRunner(func))
+
+ def count(self, min, max=False):
+ """Last recorded event must happen between min and max times.
+
+ If the max argument isn't given, it's assumed to be the same
+ as min. If max is None, the event must just happen at least
+ min times.
+ """
+ event = self._events[-1]
+ for task in event.get_tasks():
+ if isinstance(task, RunCounter):
+ event.remove_task(task)
+ event.add_task(RunCounter(min, max))
+
+ def order(self, *path_holders):
+ """Ensure that events referred to by given objects happen in order.
+
+ Arguments passed to this method might be mock objects returned
+ from recorded operations.
+ """
+ last_orderer = None
+ for path_holder in path_holders:
+ if type(path_holder) is Path:
+ path = path_holder
+ else:
+ path = path_holder.__mocker_path__
+ for event in self._events:
+ if event.path is path:
+ for task in event.get_tasks():
+ if isinstance(task, Orderer):
+ orderer = task
+ break
+ else:
+ orderer = Orderer()
+ event.add_task(orderer)
+ if last_orderer:
+ orderer.add_dependency(last_orderer)
+ last_orderer = orderer
+ break
+
+ def after(self, *path_holders):
+ """Last recorded event must happen after events referred to.
+
+ Arguments passed to this method might be mock objects returned
+ from recorded operations.
+ """
+ last_path = self._events[-1].path
+ for path_holder in path_holders:
+ self.order(path_holder, last_path)
+
+ def before(self, *path_holders):
+ """Last recorded event must happen before events referred to.
+
+ Arguments passed to this method might be mock objects returned
+ from recorded operations.
+ """
+ last_path = self._events[-1].path
+ for path_holder in path_holders:
+ self.order(last_path, path_holder)
+
+ def nospec(self):
+ """Don't check method specification of real class on last event."""
+ event = self._events[-1]
+ for task in event.get_tasks():
+ if isinstance(task, SpecChecker):
+ event.remove_task(task)
+
+ def passthrough(self):
+ event = self._events[-1]
+ if not event.path.root_mock.__mocker_object__:
+ raise TypeError("Mock object isn't a proxy")
+ event.add_task(PathApplier())
+
+
+class OrderedContext(object):
+
+ def __init__(self, mocker):
+ self._mocker = mocker
+
+ def __enter__(self):
+ return None
+
+ def __exit__(self, type, value, traceback):
+ self._mocker.unordered()
+
+
+class Mocker(MockerBase):
+ pass
+
+# Decorator to add recorders on the standard Mocker class.
+recorder = Mocker.add_recorder
+
+
+# --------------------------------------------------------------------
+# Mock object.
+
+class Mock(object):
+
+ def __init__(self, mocker, path=None, name=None, spec=None,
+ object=None, passthrough=False):
+ self.__mocker__ = mocker
+ self.__mocker_path__ = path or Path(self)
+ self.__mocker_name__ = name
+ self.__mocker_spec__ = spec
+ self.__mocker_object__ = object
+ self.__mocker_passthrough__ = passthrough
+
+ def __mocker_act__(self, kind, *args, **kwargs):
+ if self.__mocker_name__ is None:
+ self.__mocker_name__ = find_object_name(self, 2)
+ action = Action(self.__mocker_path__, kind, args, kwargs)
+ path = self.__mocker_path__ + action
+ try:
+ return self.__mocker__.act(path)
+ except UnexpectedExprError:
+ root_mock = path.root_mock
+ if (root_mock.__mocker_passthrough__ and
+ root_mock.__mocker_object__ is not None):
+ return path.apply(root_mock.__mocker_object__)
+ raise
+
+ def __getattribute__(self, name):
+ if name.startswith("__mocker_"):
+ return super(Mock, self).__getattribute__(name)
+ return self.__mocker_act__("getattr", name)
+
+ def __call__(self, *args, **kwargs):
+ return self.__mocker_act__("call", *args, **kwargs)
+
+
+def find_object_name(obj, depth=0):
+ """Try to detect how the object is named on a previous scope."""
+ try:
+ frame = sys._getframe(depth+1)
+ except:
+ return None
+ for name, frame_obj in frame.f_locals.iteritems():
+ if frame_obj is obj:
+ return name
+ self = frame.f_locals.get("self")
+ if self is not None:
+ try:
+ items = list(self.__dict__.iteritems())
+ except:
+ pass
+ else:
+ for name, self_obj in items:
+ if self_obj is obj:
+ return name
+ return None
+
+
+# --------------------------------------------------------------------
+# Action and path.
+
+class Action(object):
+
+ def __init__(self, path, kind, args, kwargs):
+ self.path = path
+ self.kind = kind
+ self.args = args
+ self.kwargs = kwargs
+ self._apply_cache = {}
+
+ def apply(self, object):
+ # This caching scheme may fail if the object gets deallocated before
+ # the action, as the id might get reused. It's somewhat easy to fix
+ # that with a weakref callback. For our uses, though, the object
+ # should never get deallocated before the action itself, so we'll
+ # just keep it simple.
+ if id(object) in self._apply_cache:
+ return self._apply_cache[id(object)]
+ kind = self.kind
+ if kind == "getattr":
+ kind = "getattribute"
+ method = getattr(object, "__%s__" % kind)
+ result = method(*self.args, **self.kwargs)
+ self._apply_cache[id(object)] = result
+ return result
+
+
+class Path(object):
+
+ def __init__(self, root_mock, actions=()):
+ self.root_mock = root_mock
+ self.actions = tuple(actions)
+
+ @property
+ def parent_path(self):
+ if not self.actions:
+ return None
+ return self.actions[-1].path
+
+ def __add__(self, action):
+ """Return a new path which includes the given action at the end."""
+ return self.__class__(self.root_mock, self.actions + (action,))
+
+ def __eq__(self, other):
+ """Verify if the two paths are equal.
+
+ Two paths are equal if they refer to the same mock object, and
+ have the actions with equal kind, args and kwargs.
+ """
+ if (self.root_mock is not other.root_mock or
+ len(self.actions) != len(other.actions)):
+ return False
+ for action, other_action in zip(self.actions, other.actions):
+ if (action.kind != other_action.kind or
+ action.args != other_action.args or
+ action.kwargs != other_action.kwargs):
+ return False
+ return True
+
+ def matches(self, other):
+ """Verify if the two paths are equivalent.
+
+ Two paths are equal if they refer to the same mock object, and
+ have the same actions performed on them.
+ """
+ if (self.root_mock is not other.root_mock or
+ len(self.actions) != len(other.actions)):
+ return False
+ for action, other_action in zip(self.actions, other.actions):
+ if (action.kind != other_action.kind or
+ not match_params(action.args, action.kwargs,
+ other_action.args, other_action.kwargs)):
+ return False
+ return True
+
+ def apply(self, object):
+ """Apply all actions sequentially on object, and return result.
+ """
+ for action in self.actions:
+ object = action.apply(object)
+ return object
+
+ def __str__(self):
+ """Transform the path into a nice string such as obj.x.y('z')."""
+ attrs = [self.root_mock.__mocker_name__ or "<mock>"]
+ for action in self.actions:
+ if action.kind == "getattr":
+ attrs.append(action.args[0])
+ elif action.kind == "call":
+ args = [repr(x) for x in action.args]
+ for pair in sorted(action.kwargs.iteritems()):
+ args.append("%s=%r" % pair)
+ attrs[-1] += "(%s)" % ", ".join(args)
+ else:
+ raise RuntimeError("Don't know how to format kind %r" %
+ action.kind)
+ return ".".join(attrs)
+
+
+class SpecialArgument(object):
+ """Markup base for special arguments for matching parameters."""
+
+
+class ANY(SpecialArgument):
+ def __eq__(self, other):
+ return not isinstance(other, SpecialArgument) or self is other
+ def __repr__(self):
+ return "ANY"
+ANY = ANY()
+
+
+class VARIOUS(SpecialArgument):
+ def __repr__(self):
+ return "VARIOUS"
+ def __eq__(self, other):
+ return self is other
+VARIOUS = VARIOUS()
+
+
+class SAME(SpecialArgument):
+ def __init__(self, object):
+ self.object = object
+ def __repr__(self):
+ return "SAME(%r)" % (self.object,)
+ def __eq__(self, other):
+ if isinstance(other, SpecialArgument):
+ return type(other) == type(self) and self.object is other.object
+ return self.object is other
+
+
+class CONTAINS(SpecialArgument):
+ def __init__(self, object):
+ self.object = object
+ def __repr__(self):
+ return "CONTAINS(%r)" % (self.object,)
+ def __eq__(self, other):
+ if isinstance(other, SpecialArgument):
+ return type(other) == type(self) and self.object == other.object
+ return self.object in other
+
+
+def match_params(args1, kwargs1, args2, kwargs2):
+ """Match the two sets of parameters, considering the special VARIOUS."""
+
+ # If they are equal, we're done.
+ if args1 == args2 and kwargs1 == kwargs2:
+ return True
+
+ # Then, only if we have a VARIOUS argument we have a chance of matching.
+ if VARIOUS not in args1:
+ return False
+
+ # Any keyword requests should be honored, but unrequested keywords
+ # are also accepted, since the user is fine with whatever (VARIOUS!).
+ for key, value in kwargs1.iteritems():
+ if kwargs2.get(key) != value:
+ return False
+
+ # Easy choice. Keywords are matching, and anything on args are accepted.
+ if (VARIOUS,) == args1:
+ return True
+
+ # We have something different there. If we don't have positional
+ # arguments on the original call, it can't match.
+ if not args2:
+ # Unless we have just several VARIOUS (which is bizarre, but..).
+ for arg1 in args1:
+ if arg1 is not VARIOUS:
+ return False
+ return True
+
+ # Ok, all bets are lost. We have to actually do the more expensive
+ # matching. This is an algorithm based on the idea of the Levenshtein
+ # Distance between two strings, but heavily hacked for this purpose.
+ args2l = len(args2)
+ if args1[0] is VARIOUS:
+ args1 = args1[1:]
+ array = [0]*args2l
+ else:
+ array = [1]*args2l
+ for i in range(len(args1)):
+ last = array[0]
+ if args1[i] is VARIOUS:
+ for j in range(1, args2l):
+ last, array[j] = array[j], min(array[j-1], array[j], last)
+ else:
+ array[0] = i or int(args1[i] != args2[0])
+ for j in range(1, args2l):
+ last, array[j] = array[j], last or int(args1[i] != args2[j])
+ if 0 not in array:
+ return False
+ if array[-1] != 0:
+ return False
+ return True
+
+
+# --------------------------------------------------------------------
+# Event and task base.
+
+class Event(object):
+ """Aggregation of tasks that keep track of a recorded action.
+
+ An event represents something that may or may not happen while the
+ mocked environment is running, such as an attribute access, or a
+ method call. The event is composed of several tasks that are
+ orchestrated together to create a composed meaning for the event,
+ including for which actions it should be run, what happens when it
+ runs, and what's the expectations about the actions run.
+ """
+
+ def __init__(self, path=None):
+ self.path = path
+ self._tasks = []
+
+ def add_task(self, task):
+ """Add a new task to this taks."""
+ self._tasks.append(task)
+ return task
+
+ def remove_task(self, task):
+ self._tasks.remove(task)
+
+ def get_tasks(self):
+ return self._tasks[:]
+
+ def matches(self, path):
+ """Return true if *all* tasks match the given path."""
+ for task in self._tasks:
+ if not task.matches(path):
+ return False
+ return bool(self._tasks)
+
+ def run(self, path):
+ """Run all tasks with the given action.
+
+ Running an event means running all of its tasks individually and in
+ order. An event should only ever be run if all of its tasks claim to
+ match the given action.
+
+ The result of this method will be the last result of a task
+ which isn't None, or None if they're all None.
+ """
+ result = None
+ for task in self._tasks:
+ task_result = task.run(path)
+ if task_result is not None:
+ result = task_result
+ return result
+
+ def satisfied(self):
+ """Return true if all tasks are satisfied.
+
+ Being satisfied means that there are no unmet expectations.
+ """
+ try:
+ self.verify()
+ except AssertionError:
+ return False
+ return True
+
+ def verify(self):
+ """Run verify on all tasks.
+
+ The verify method is supposed to raise an AssertionError if the
+ task has unmet expectations, with a nice debugging message
+ explaining why it wasn't met.
+ """
+ for task in self._tasks:
+ task.verify()
+
+ def set_state(self, state):
+ """Change the task state of all tasks to reflect that of the mocker.
+
+ State is either REPLAY, RECORD, or RESTORE.
+ """
+ for task in self._tasks:
+ task.set_state(state)
+
+
+class Task(object):
+ """Minor item used for composition of a major task.
+
+ A task item is responsible for adding any kind of logic to a
+ task. Examples of that are counting the number of times the
+ task was made, verifying parameters if any, and so on.
+ """
+
+ def matches(self, path):
+ """Return true if the task is supposed to be run for the given path.
+ """
+ return True
+
+ def run(self, path):
+ """Perform the task item, considering that the given action happened.
+ """
+
+ def verify(self):
+ """Raise AssertionError if expectations for this item are unmet.
+
+ The exception should include a nice explanation about why this
+ item is unmet.
+ """
+
+ def set_state(self, state):
+ """Perform actions needed to reflect state of the mocker.
+
+ State is either REPLAY, RECORD, or RESTORE.
+ """
+
+# --------------------------------------------------------------------
+# Task implementations.
+
+class PathMatcher(Task):
+ """Match the action path against a given path."""
+
+ def __init__(self, path):
+ self.path = path
+
+ def matches(self, path):
+ return self.path.matches(path)
+
+@recorder
+def path_matcher_recorder(mocker, event):
+ event.add_task(PathMatcher(event.path))
+
+
+class RunCounter(Task):
+ """Task which verifies if the number of runs are within given boundaries.
+ """
+
+ def __init__(self, min, max=False):
+ self.min = min
+ if max is None:
+ self.max = sys.maxint
+ elif max is False:
+ self.max = min
+ else:
+ self.max = max
+ self._runs = 0
+
+ def run(self, path):
+ self._runs += 1
+ if self._runs > self.max:
+ self.verify()
+
+ def verify(self):
+ if not self.min <= self._runs <= self.max:
+ if self.max == sys.maxint:
+ raise AssertionError("Expected at least %d time(s), "
+ "seen %d time(s)."
+ % (self.min, self._runs))
+ if self.min == self.max:
+ raise AssertionError("Expected %d time(s), seen %d time(s)."
+ % (self.min, self._runs))
+ raise AssertionError("Expected %d to %d time(s), seen %d time(s)."
+ % (self.min, self.max, self._runs))
+
+class ImplicitRunCounter(RunCounter):
+ """RunCounter inserted by default on any event.
+
+ This is a way to differentiate explicitly added counters and
+ implicit ones.
+ """
+
+@recorder
+def run_counter_recorder(mocker, event):
+ """In principle, any event may be repeated once."""
+ event.add_task(ImplicitRunCounter(1))
+
+@recorder
+def run_counter_removal_recorder(mocker, event):
+ """
+ Events created by getattr actions which lead to other events
+ may be repeated any number of times. For that, we remove implicit
+ run counters of any getattr actions leading to current one.
+ """
+ parent_path = event.path.parent_path
+ for event in reversed(mocker.get_events()):
+ if (event.path is parent_path and
+ event.path.actions[-1].kind == "getattr"):
+ for task in event.get_tasks():
+ if type(task) is ImplicitRunCounter:
+ event.remove_task(task)
+
+
+class MockReturner(Task):
+ """Return a mock based on the action path."""
+
+ def __init__(self, mocker):
+ self.mocker = mocker
+
+ def run(self, path):
+ return Mock(self.mocker, path)
+
+@recorder
+def mock_returner_recorder(mocker, event):
+ """Events that lead to other events must return mock objects."""
+ parent_path = event.path.parent_path
+ for event in mocker.get_events():
+ if event.path is parent_path:
+ for task in event.get_tasks():
+ if isinstance(task, MockReturner):
+ break
+ else:
+ event.add_task(MockReturner(mocker))
+ break
+
+
+class FunctionRunner(Task):
+ """Task that runs a function everything it's run.
+
+ Arguments of the last action in the path are passed to the function,
+ and the function result is also returned.
+ """
+
+ def __init__(self, func):
+ self._func = func
+
+ def run(self, path):
+ action = path.actions[-1]
+ return self._func(*action.args, **action.kwargs)
+
+
+class PathApplier(Task):
+ """Task that applies a path in the real object, and returns the result."""
+
+ def run(self, path):
+ return path.apply(path.root_mock.__mocker_object__)
+
+
+class Orderer(Task):
+ """Task to establish an order relation between two events.
+
+ An orderer task will only match once all its dependencies have
+ been run.
+ """
+
+ def __init__(self):
+ self._run = False
+ self._dependencies = []
+
+ def run(self, path):
+ self._run = True
+
+ def has_run(self):
+ return self._run
+
+ def add_dependency(self, orderer):
+ self._dependencies.append(orderer)
+
+ def get_dependencies(self):
+ return self._dependencies
+
+ def matches(self, path):
+ for dependency in self._dependencies:
+ if not dependency.has_run():
+ return False
+ return True
+
+
+class SpecChecker(Task):
+ """Task to check if arguments of the last action conform to a real method.
+ """
+
+ def __init__(self, method):
+ self._method = method
+ if method:
+ self._args, self._varargs, self._varkwargs, self._defaults = \
+ inspect.getargspec(method)
+ if self._defaults is None:
+ self._defaults = ()
+ if type(method) is type(self.run):
+ self._args = self._args[1:]
+
+ def get_method(self):
+ return self._method
+
+ def run(self, path):
+ if not self._method:
+ raise AssertionError("method not existent in real class")
+ action = path.actions[-1]
+ obtained_len = len(action.args)
+ obtained_kwargs = action.kwargs.copy()
+ nodefaults_len = len(self._args) - len(self._defaults)
+ for i, name in enumerate(self._args):
+ if i < obtained_len and name in action.kwargs:
+ raise AssertionError("%r parameter provided twice" % name)
+ if (i >= obtained_len and i < nodefaults_len and
+ name not in action.kwargs):
+ raise AssertionError("%r parameter not provided" % name)
+ obtained_kwargs.pop(name, None)
+ if obtained_len > len(self._args) and not self._varargs:
+ raise AssertionError("maximum number of parameters exceeded")
+ if obtained_kwargs and not self._varkwargs:
+ raise AssertionError("unknown kwargs: %s" %
+ ", ".join(obtained_kwargs))
+
+@recorder
+def spec_checker_recorder(mocker, event):
+ cls = event.path.root_mock.__mocker_spec__
+ actions = event.path.actions
+ if (cls and len(actions) == 2 and
+ actions[0].kind == "getattr" and actions[1].kind == "call"):
+ method = getattr(cls, actions[0].args[0], None)
+ event.add_task(SpecChecker(method))
+
+
+class ProxyInstaller(Task):
+ """Task which installs and deinstalls proxy mocks.
+
+ This task will replace a real object by a mock in all dictionaries
+ found in the running interpreter via the garbage collecting system.
+ """
+
+ def __init__(self, mock):
+ self.mock = mock
+
+ def matches(self, path):
+ return False
+
+ def set_state(self, state):
+ if state is REPLAY:
+ install, remove = self.mock, self.mock.__mocker_object__
+ else:
+ install, remove = self.mock.__mocker_object__, self.mock
+ mock_dict = object.__getattribute__(self.mock, "__dict__")
+ protected = set((id(self.__dict__), id(mock_dict)))
+ for referrer in gc.get_referrers(remove):
+ if id(referrer) not in protected and type(referrer) is dict:
+ for key, value in referrer.iteritems():
+ if value is remove:
+ referrer[key] = install
diff --git a/test.py b/test.py
new file mode 100755
index 0000000..f51edb9
--- /dev/null
+++ b/test.py
@@ -0,0 +1,1841 @@
+#!/usr/bin/python
+import unittest
+import sys
+import gc
+
+from types import ModuleType
+
+from mocker import (
+ MockerBase, Mocker, Mock, Event, Task, Action, Path, recorder, expect,
+ PathMatcher, path_matcher_recorder, RunCounter, ImplicitRunCounter,
+ run_counter_recorder, run_counter_removal_recorder, MockReturner,
+ mock_returner_recorder, FunctionRunner, Orderer, SpecChecker,
+ spec_checker_recorder, match_params, ANY, VARIOUS, SAME, CONTAINS,
+ UnexpectedExprError, PathApplier, RECORD, REPLAY, RESTORE, ProxyInstaller)
+
+
+class CleanMocker(MockerBase):
+ pass
+
+
+class IntegrationTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mocker = Mocker()
+
+ def test_count(self):
+ obj = self.mocker.obj()
+ obj.x
+ self.mocker.count(2, 3)
+
+ self.mocker.replay()
+ obj.x
+ self.assertRaises(AssertionError, self.mocker.verify)
+ obj.x
+ self.mocker.verify()
+ obj.x
+ self.mocker.verify()
+ self.assertRaises(AssertionError, getattr, obj, "x")
+
+ def test_ordered(self):
+ obj = self.mocker.obj()
+
+ with_manager = self.mocker.ordered()
+ with_manager.__enter__()
+ obj.x
+ obj.y
+ obj.z
+ with_manager.__exit__(None, None, None)
+
+ self.mocker.replay()
+
+ self.assertRaises(AssertionError, getattr, obj, "y")
+ self.assertRaises(AssertionError, getattr, obj, "z")
+ obj.x
+ self.assertRaises(AssertionError, getattr, obj, "z")
+ obj.y
+ obj.z
+
+ def test_spec(self):
+ class C(object):
+ def m(self, a): pass
+
+ obj = self.mocker.obj(C)
+
+ obj.m(1)
+ obj.m(a=1)
+ obj.m(1, 2)
+ obj.m(b=2)
+ obj.x()
+ obj.y()
+ self.mocker.nospec()
+ obj.z()
+
+ self.mocker.replay()
+
+ obj.m(1)
+ obj.m(a=1)
+ obj.y()
+ self.assertRaises(AssertionError, obj.m, 1, 2)
+ self.assertRaises(AssertionError, obj.m, b=2)
+ self.assertRaises(AssertionError, obj.x)
+ self.assertRaises(AssertionError, obj.z)
+
+ def test_result(self):
+ obj = self.mocker.obj()
+ obj.x
+ self.mocker.result(42)
+ self.mocker.replay()
+ self.assertEquals(obj.x, 42)
+
+ def test_throw(self):
+ obj = self.mocker.obj()
+ obj.x()
+ self.mocker.throw(ValueError)
+ self.mocker.replay()
+ self.assertRaises(ValueError, obj.x)
+
+ def test_call(self):
+ calls = []
+ def func(arg):
+ calls.append(arg)
+ return 42
+ obj = self.mocker.obj()
+ obj.x(24)
+ self.mocker.call(func)
+ self.mocker.replay()
+ self.assertEquals(obj.x(24), 42)
+ self.assertEquals(calls, [24])
+
+ def test_call_result(self):
+ calls = []
+ def func(arg):
+ calls.append(arg)
+ return arg
+ obj = self.mocker.obj()
+ obj.x(24)
+ self.mocker.call(func)
+ self.mocker.result(42)
+ self.mocker.replay()
+ self.assertEquals(obj.x(24), 42)
+ self.assertEquals(calls, [24])
+
+ def test_proxy(self):
+ class C(object):
+ def sum(self, *args):
+ return sum(args)
+
+ obj = self.mocker.proxy(C())
+ expect(obj.multiply(2, 3)).result(6)
+ expect(obj.sum(0, 0)).result(1)
+ expect(obj.sum(0, 0)).passthrough()
+
+ self.mocker.replay()
+
+ self.assertEquals(obj.multiply(2, 3), 6) # Mocked.
+ self.assertRaises(AttributeError, obj.multiply) # Passed through.
+
+ self.assertEquals(obj.sum(2, 3), 5) # Passed through.
+ self.assertEquals(obj.sum(0, 0), 1) # Mocked.
+ self.assertEquals(obj.sum(0, 0), 0) # Passed through explicitly.
+ self.assertRaises(AssertionError, obj.sum, 0, 0) # Seen twice.
+
+ def test_module_install_and_restore(self):
+ try:
+ module = self.mocker.module("calendar")
+ import calendar
+ self.assertTrue(calendar is not module)
+ self.mocker.replay()
+ import calendar
+ self.assertTrue(calendar is module)
+ self.mocker.restore()
+ import calendar
+ self.assertTrue(calendar is not module)
+ finally:
+ self.mocker.restore()
+
+ def test_module_os_path(self):
+ try:
+ path = self.mocker.module("os.path")
+ expect(path.join(VARIOUS)).call(lambda *args: "-".join(args))
+ expect(path.join("e", VARIOUS)).passthrough()
+ self.mocker.replay()
+ import os
+ self.assertEquals(os.path.join("a", "b", "c"), "a-b-c")
+ self.assertNotEquals(os.path.join("e", "f", "g"), "e-f-g")
+ finally:
+ self.mocker.restore()
+
+
+class ExpectTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mocker = CleanMocker()
+
+ def test_calling_mocker(self):
+ obj = self.mocker.obj()
+ expect(obj.attr).result(123)
+ self.mocker.replay()
+ self.assertEquals(obj.attr, 123)
+
+ def test_chaining(self):
+ obj = self.mocker.obj()
+ expect(obj.attr).result(123).result(42)
+ self.mocker.replay()
+ self.assertEquals(obj.attr, 42)
+
+
+class MockerTest(unittest.TestCase):
+
+ def setUp(self):
+ self.recorded = []
+ self.mocker = CleanMocker()
+ @self.mocker.add_recorder
+ def recorder(mocker, event):
+ self.recorded.append((mocker, event))
+
+ self.action = Action(Path(Mock(self.mocker, name="mock")),
+ "getattr", ("attr",), {})
+ self.path = self.action.path + self.action
+
+ def test_default_state(self):
+ self.assertEquals(self.mocker.get_state(), RECORD)
+
+ def test_replay(self):
+ calls = []
+ event = self.mocker.add_event(Event())
+ task = event.add_task(Task())
+ task.set_state = lambda state: calls.append(state)
+ self.mocker.replay()
+ self.mocker.replay()
+ self.assertEquals(self.mocker.get_state(), REPLAY)
+ self.assertEquals(calls, [REPLAY])
+
+ def test_record(self):
+ calls = []
+ event = self.mocker.add_event(Event())
+ task = event.add_task(Task())
+ task.set_state = lambda state: calls.append(state)
+ self.mocker.replay()
+ self.mocker.record()
+ self.mocker.record()
+ self.assertEquals(self.mocker.get_state(), RECORD)
+ self.assertEquals(calls, [REPLAY, RECORD])
+
+ def test_restore(self):
+ calls = []
+ event = self.mocker.add_event(Event())
+ task = event.add_task(Task())
+ task.set_state = lambda state: calls.append(state)
+ self.mocker.restore()
+ self.mocker.restore()
+ self.assertEquals(self.mocker.get_state(), RESTORE)
+ self.assertEquals(calls, [RESTORE])
+
+ def test_verify(self):
+ calls = []
+ class MyEvent(object):
+ def __init__(self, name):
+ self.name = name
+ def verify(self):
+ calls.append(self.name)
+ self.mocker.add_event(MyEvent("1"))
+ self.mocker.add_event(MyEvent("2"))
+
+ self.mocker.verify()
+
+ self.assertEquals(calls, ["1", "2"])
+
+ def test_add_recorder_on_instance(self):
+ obj1 = object()
+ obj2 = object()
+ mocker = CleanMocker()
+ self.assertEquals(mocker.add_recorder(obj1), obj1)
+ self.assertEquals(mocker.add_recorder(obj2), obj2)
+ self.assertEquals(mocker.get_recorders(), [obj1, obj2])
+ mocker = CleanMocker()
+ self.assertEquals(mocker.add_recorder(obj1), obj1)
+ self.assertEquals(mocker.get_recorders(), [obj1])
+
+ def test_add_recorder_on_class(self):
+ class MyMocker(CleanMocker):
+ pass
+ obj1 = object()
+ obj2 = object()
+ self.assertEquals(MyMocker.add_recorder(obj1), obj1)
+ self.assertEquals(MyMocker.add_recorder(obj2), obj2)
+ mocker = MyMocker()
+ self.assertEquals(mocker.get_recorders(), [obj1, obj2])
+ mocker = MyMocker()
+ self.assertEquals(mocker.get_recorders(), [obj1, obj2])
+
+ def test_add_recorder_on_subclass(self):
+ class MyMocker1(CleanMocker):
+ pass
+ obj1 = object()
+ MyMocker1.add_recorder(obj1)
+ class MyMocker2(MyMocker1):
+ pass
+ obj2 = object()
+ MyMocker2.add_recorder(obj2)
+ self.assertEquals(MyMocker1.get_recorders(), [obj1])
+ self.assertEquals(MyMocker2.get_recorders(), [obj1, obj2])
+
+ def test_remove_recorder_on_instance(self):
+ obj1 = object()
+ obj2 = object()
+ obj3 = object()
+ class MyMocker(CleanMocker):
+ pass
+ MyMocker.add_recorder(obj1)
+ MyMocker.add_recorder(obj2)
+ MyMocker.add_recorder(obj3)
+ mocker = MyMocker()
+ mocker.remove_recorder(obj2)
+ self.assertEquals(mocker.get_recorders(), [obj1, obj3])
+ self.assertEquals(MyMocker.get_recorders(), [obj1, obj2, obj3])
+
+ def test_remove_recorder_on_class(self):
+ class MyMocker(CleanMocker):
+ pass
+ obj1 = object()
+ obj2 = object()
+ self.assertEquals(MyMocker.add_recorder(obj1), obj1)
+ self.assertEquals(MyMocker.add_recorder(obj2), obj2)
+ MyMocker.remove_recorder(obj1)
+ self.assertEquals(MyMocker.get_recorders(), [obj2])
+
+ def test_obj(self):
+ self.mocker = CleanMocker()
+ obj = self.mocker.obj()
+ self.assertEquals(type(obj), Mock)
+
+ def test_obj_with_spec(self):
+ class C(object): pass
+ self.mocker = CleanMocker()
+ obj = self.mocker.obj(C)
+ self.assertEquals(obj.__mocker_spec__, C)
+
+ def test_proxy(self):
+ original = object()
+ self.mocker = CleanMocker()
+ obj = self.mocker.proxy(original)
+ self.assertEquals(type(obj), Mock)
+ self.assertEquals(obj.__mocker_object__, original)
+
+ def test_proxy_with_spec(self):
+ original = object()
+ class C(object): pass
+ self.mocker = CleanMocker()
+ obj = self.mocker.proxy(original, C)
+ self.assertEquals(obj.__mocker_object__, original)
+ self.assertEquals(obj.__mocker_spec__, C)
+
+ def test_proxy_with_passthrough_false(self):
+ original = object()
+ class C(object): pass
+ self.mocker = CleanMocker()
+ obj = self.mocker.proxy(original, C, passthrough=False)
+ self.assertEquals(obj.__mocker_object__, original)
+ self.assertEquals(obj.__mocker_spec__, C)
+ self.assertEquals(obj.__mocker_passthrough__, False)
+
+ def test_proxy_install(self):
+ from os import path
+ obj = object()
+ proxy = self.mocker.proxy(obj, install=True)
+ self.assertEquals(type(proxy), Mock)
+ self.assertEquals(type(proxy.__mocker_object__), object)
+ self.assertEquals(proxy.__mocker_object__, obj)
+ (event,) = self.mocker.get_events()
+ (task,) = event.get_tasks()
+ self.assertEquals(type(task), ProxyInstaller)
+ self.assertTrue(task.mock is proxy)
+ self.assertTrue(task.mock.__mocker_object__ is obj)
+ self.assertTrue(proxy is not obj)
+
+ def test_module(self):
+ from os import path
+ module = self.mocker.module("os.path")
+ self.assertEquals(type(module), Mock)
+ self.assertEquals(type(module.__mocker_object__), ModuleType)
+ self.assertEquals(module.__mocker_name__, "os.path")
+ self.assertEquals(module.__mocker_object__, path)
+ (event,) = self.mocker.get_events()
+ (task,) = event.get_tasks()
+ self.assertEquals(type(task), ProxyInstaller)
+ self.assertTrue(task.mock is module)
+ self.assertTrue(task.mock.__mocker_object__ is path)
+ self.assertTrue(module is not path)
+
+ def test_module_with_passthrough_false(self):
+ module = self.mocker.module("calendar", passthrough=False)
+ self.assertEquals(module.__mocker_passthrough__, False)
+
+ def test_add_and_get_event(self):
+ self.mocker.add_event(41)
+ self.assertEquals(self.mocker.add_event(42), 42)
+ self.assertEquals(self.mocker.get_events(), [41, 42])
+
+ def test_recording(self):
+ obj = self.mocker.obj()
+ obj.attr()
+
+ self.assertEquals(len(self.recorded), 2)
+
+ action1 = Action(None, "getattr", ("attr",), {})
+ action2 = Action(None, "call", (), {})
+
+ mocker1, event1 = self.recorded[0]
+ self.assertEquals(mocker1, self.mocker)
+ self.assertEquals(type(event1), Event)
+ self.assertTrue(event1.path.matches(Path(obj, [action1])))
+
+ mocker2, event2 = self.recorded[1]
+ self.assertEquals(mocker2, self.mocker)
+ self.assertEquals(type(event2), Event)
+ self.assertTrue(event2.path.matches(Path(obj, [action1, action2])))
+
+ self.assertEquals(self.mocker.get_events(), [event1, event2])
+
+ def test_recording_result_path(self):
+ obj = self.mocker.obj()
+ result = obj.attr()
+ path = Path(obj, [Action(None, "getattr", ("attr",), {}),
+ Action(None, "call", (), {})])
+ self.assertTrue(result.__mocker_path__.matches(path))
+
+ def test_replaying_no_events(self):
+ self.mocker.replay()
+ try:
+ self.mocker.act(self.path)
+ except AssertionError, e:
+ pass
+ else:
+ self.fail("AssertionError not raised")
+ self.assertEquals(str(e), "[Mocker] Unexpected expression: mock.attr")
+
+ def test_replaying_matching(self):
+ calls = []
+ class MyTask(Task):
+ def matches(_, path):
+ calls.append("matches")
+ self.assertTrue(self.path.matches(path))
+ return True
+ def run(_, path):
+ calls.append("run")
+ self.assertTrue(self.path.matches(path))
+ return "result"
+ event = Event()
+ event.add_task(MyTask())
+ self.mocker.add_event(event)
+ self.mocker.replay()
+ self.assertEquals(self.mocker.act(self.path), "result")
+ self.assertEquals(calls, ["matches", "run"])
+
+ def test_replaying_none_matching(self):
+ calls = []
+ class MyTask(Task):
+ def matches(_, path):
+ self.assertTrue(self.path.matches(path))
+ calls.append("matches")
+ return False
+ event = Event()
+ event.add_task(MyTask())
+ self.mocker.add_event(event)
+ self.mocker.replay()
+ self.assertRaises(AssertionError, self.mocker.act, self.path)
+ self.assertEquals(calls, ["matches"])
+
+ def test_replaying_not_satisfied_first(self):
+ class MyTask1(Task):
+ def run(self, path):
+ return "result1"
+ class MyTask2(Task):
+ raised = False
+ def verify(self):
+ if not self.raised:
+ self.raised = True
+ raise AssertionError()
+ def run(self, path):
+ return "result2"
+ event1 = self.mocker.add_event(Event())
+ event1.add_task(MyTask1())
+ event2 = self.mocker.add_event(Event())
+ event2.add_task(MyTask2())
+ event3 = self.mocker.add_event(Event())
+ event3.add_task(MyTask1())
+ self.mocker.replay()
+ self.assertEquals(self.mocker.act(self.path), "result2")
+ self.assertEquals(self.mocker.act(self.path), "result1")
+
+ def test_recorder_decorator(self):
+ result = recorder(42)
+ try:
+ self.assertEquals(result, 42)
+ self.assertEquals(Mocker.get_recorders()[-1], 42)
+ self.assertEquals(MockerBase.get_recorders(), [])
+ finally:
+ Mocker.remove_recorder(42)
+
+ def test_result(self):
+ event1 = self.mocker.add_event(Event())
+ event2 = self.mocker.add_event(Event())
+ self.mocker.result(123)
+ self.assertEquals(event2.run(self.path), 123)
+
+ def test_throw(self):
+ class MyException(Exception): pass
+ event1 = self.mocker.add_event(Event())
+ event2 = self.mocker.add_event(Event())
+ self.mocker.throw(MyException)
+ self.assertRaises(MyException, event2.run, self.path)
+
+ def test_call(self):
+ event1 = self.mocker.add_event(Event())
+ event2 = self.mocker.add_event(Event())
+ self.mocker.call(lambda *args, **kwargs: 123)
+ self.assertEquals(event2.run(self.path), 123)
+
+ def test_count(self):
+ event1 = self.mocker.add_event(Event())
+ event2 = self.mocker.add_event(Event())
+ event2.add_task(ImplicitRunCounter(1))
+ self.mocker.count(2, 3)
+ self.assertEquals(len(event1.get_tasks()), 0)
+ (task,) = event2.get_tasks()
+ self.assertEquals(type(task), RunCounter)
+ self.assertEquals(task.min, 2)
+ self.assertEquals(task.max, 3)
+ self.mocker.count(4)
+ self.assertEquals(len(event1.get_tasks()), 0)
+ (task,) = event2.get_tasks()
+ self.assertEquals(type(task), RunCounter)
+ self.assertEquals(task.min, 4)
+ self.assertEquals(task.max, 4)
+
+ def test_order(self):
+ mock1 = self.mocker.obj()
+ mock2 = self.mocker.obj()
+ mock3 = self.mocker.obj()
+ mock4 = self.mocker.obj()
+ result1 = mock1.attr1(1)
+ result2 = mock2.attr2(2)
+ result3 = mock3.attr3(3)
+ result4 = mock4.attr4(4)
+
+ # Try to spoil the logic which decides which task to reuse.
+ other_task = Task()
+ for event in self.mocker.get_events():
+ event.add_task(other_task)
+
+ self.mocker.order(result1, result2, result3)
+ self.mocker.order(result1, result4)
+ self.mocker.order(result2, result4)
+ events = self.mocker.get_events()
+ self.assertEquals(len(events), 8)
+
+ self.assertEquals(events[0].get_tasks(), [other_task])
+ other_task_, task1 = events[1].get_tasks()
+ self.assertEquals(type(task1), Orderer)
+ self.assertEquals(task1.get_dependencies(), [])
+ self.assertEquals(other_task_, other_task)
+
+ self.assertEquals(events[2].get_tasks(), [other_task])
+ other_task_, task3 = events[3].get_tasks()
+ self.assertEquals(type(task3), Orderer)
+ self.assertEquals(task3.get_dependencies(), [task1])
+ self.assertEquals(other_task_, other_task)
+
+ self.assertEquals(events[4].get_tasks(), [other_task])
+ other_task_, task5 = events[5].get_tasks()
+ self.assertEquals(type(task5), Orderer)
+ self.assertEquals(task5.get_dependencies(), [task3])
+ self.assertEquals(other_task_, other_task)
+
+ self.assertEquals(events[6].get_tasks(), [other_task])
+ other_task_, task7 = events[7].get_tasks()
+ self.assertEquals(type(task7), Orderer)
+ self.assertEquals(task7.get_dependencies(), [task1, task3])
+ self.assertEquals(other_task_, other_task)
+
+ def test_after(self):
+ mock1 = self.mocker.obj()
+ mock2 = self.mocker.obj()
+ mock3 = self.mocker.obj()
+ result1 = mock1.attr1(1)
+ result2 = mock2.attr2(2)
+ result3 = mock3.attr3(3)
+
+ # Try to spoil the logic which decides which task to reuse.
+ other_task = Task()
+ for event in self.mocker.get_events():
+ event.add_task(other_task)
+
+ self.mocker.after(result1, result2)
+
+ events = self.mocker.get_events()
+ self.assertEquals(len(events), 6)
+
+ self.assertEquals(events[0].get_tasks(), [other_task])
+ other_task_, task1 = events[1].get_tasks()
+ self.assertEquals(type(task1), Orderer)
+ self.assertEquals(task1.get_dependencies(), [])
+ self.assertEquals(other_task_, other_task)
+
+ self.assertEquals(events[2].get_tasks(), [other_task])
+ other_task_, task3 = events[3].get_tasks()
+ self.assertEquals(type(task3), Orderer)
+ self.assertEquals(task3.get_dependencies(), [])
+ self.assertEquals(other_task_, other_task)
+
+ self.assertEquals(events[4].get_tasks(), [other_task])
+ other_task_, task5 = events[5].get_tasks()
+ self.assertEquals(type(task5), Orderer)
+ self.assertEquals(task5.get_dependencies(), [task1, task3])
+ self.assertEquals(other_task_, other_task)
+
+ def test_before(self):
+ mock1 = self.mocker.obj()
+ mock2 = self.mocker.obj()
+ mock3 = self.mocker.obj()
+ result1 = mock1.attr1(1)
+ result2 = mock2.attr2(2)
+ result3 = mock3.attr3(3)
+
+ # Try to spoil the logic which decides which task to reuse.
+ other_task = Task()
+ for event in self.mocker.get_events():
+ event.add_task(other_task)
+
+ self.mocker.before(result1, result2)
+
+ events = self.mocker.get_events()
+ self.assertEquals(len(events), 6)
+
+ self.assertEquals(events[4].get_tasks(), [other_task])
+ other_task_, task5 = events[5].get_tasks()
+ self.assertEquals(type(task5), Orderer)
+ self.assertEquals(task5.get_dependencies(), [])
+ self.assertEquals(other_task_, other_task)
+
+ self.assertEquals(events[0].get_tasks(), [other_task])
+ other_task_, task1 = events[1].get_tasks()
+ self.assertEquals(type(task1), Orderer)
+ self.assertEquals(task1.get_dependencies(), [task5])
+ self.assertEquals(other_task_, other_task)
+
+ self.assertEquals(events[2].get_tasks(), [other_task])
+ other_task_, task3 = events[3].get_tasks()
+ self.assertEquals(type(task3), Orderer)
+ self.assertEquals(task3.get_dependencies(), [task5])
+ self.assertEquals(other_task_, other_task)
+
+ def test_default_ordering(self):
+ self.assertEquals(self.mocker.is_ordering(), False)
+
+ def test_ordered(self):
+ self.mocker.ordered()
+ self.assertEquals(self.mocker.is_ordering(), True)
+
+ def test_ordered_enter_exit(self):
+ with_manager = self.mocker.ordered()
+ self.assertEquals(self.mocker.is_ordering(), True)
+ with_manager.__enter__()
+ self.assertEquals(self.mocker.is_ordering(), True)
+ with_manager.__exit__(None, None, None)
+ self.assertEquals(self.mocker.is_ordering(), False)
+
+ def test_unordered(self):
+ self.mocker.ordered()
+ self.mocker.unordered()
+ self.assertEquals(self.mocker.is_ordering(), False)
+
+ def test_ordered_events(self):
+ mock = self.mocker.obj()
+
+ # Ensure that the state is correctly reset between
+ # different ordered blocks.
+ self.mocker.ordered()
+
+ mock.a
+
+ self.mocker.unordered()
+ self.mocker.ordered()
+
+ mock.x.y.z
+
+ events = self.mocker.get_events()
+
+ (task1,) = events[1].get_tasks()
+ (task2,) = events[2].get_tasks()
+ (task3,) = events[3].get_tasks()
+
+ self.assertEquals(type(task1), Orderer)
+ self.assertEquals(type(task2), Orderer)
+ self.assertEquals(type(task3), Orderer)
+
+ self.assertEquals(task1.get_dependencies(), [])
+ self.assertEquals(task2.get_dependencies(), [task1])
+ self.assertEquals(task3.get_dependencies(), [task2])
+
+ def test_nospec(self):
+ event1 = self.mocker.add_event(Event())
+ event2 = self.mocker.add_event(Event())
+ task1 = event1.add_task(SpecChecker(None))
+ task2 = event2.add_task(Task())
+ task3 = event2.add_task(SpecChecker(None))
+ task4 = event2.add_task(Task())
+ self.mocker.nospec()
+ self.assertEquals(event1.get_tasks(), [task1])
+ self.assertEquals(event2.get_tasks(), [task2, task4])
+
+ def test_passthrough(self):
+ obj = self.mocker.proxy(object())
+ event1 = self.mocker.add_event(Event(Path(obj)))
+ event2 = self.mocker.add_event(Event(Path(obj)))
+ self.mocker.passthrough()
+ self.assertEquals(event1.get_tasks(), [])
+ (task,) = event2.get_tasks()
+ self.assertEquals(type(task), PathApplier)
+
+ def test_passthrough_fails_on_unproxied(self):
+ obj = self.mocker.obj()
+ event1 = self.mocker.add_event(Event(Path(obj)))
+ event2 = self.mocker.add_event(Event(Path(obj)))
+ self.assertRaises(TypeError, self.mocker.passthrough)
+
+ def test_on(self):
+ obj = self.mocker.obj()
+ self.mocker.on(obj.attr).result(123)
+ self.mocker.replay()
+ self.assertEquals(obj.attr, 123)
+
+
+class ActionTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mock = Mock(None, name="mock")
+
+ def test_create(self):
+ objects = [object() for i in range(4)]
+ action = Action(*objects)
+ self.assertEquals(action.path, objects[0])
+ self.assertEquals(action.kind, objects[1])
+ self.assertEquals(action.args, objects[2])
+ self.assertEquals(action.kwargs, objects[3])
+
+ def test_apply_getattr(self):
+ class C(object):
+ pass
+ obj = C()
+ obj.x = C()
+ action = Action(None, "getattr", ("x",), {})
+ self.assertEquals(action.apply(obj), obj.x)
+
+ def test_apply_call(self):
+ obj = lambda a, b: a+b
+ action = Action(None, "call", (1,), {"b": 2})
+ self.assertEquals(action.apply(obj), 3)
+
+ def test_apply_caching(self):
+ values = iter(range(10))
+ obj = lambda: values.next()
+ action = Action(None, "call", (), {})
+ self.assertEquals(action.apply(obj), 0)
+ self.assertEquals(action.apply(obj), 0)
+ obj = lambda: values.next()
+ self.assertEquals(action.apply(obj), 1)
+
+
+class PathTest(unittest.TestCase):
+
+ def setUp(self):
+ class StubMocker(object):
+ @staticmethod
+ def act(path):
+ pass
+ self.mocker = StubMocker()
+ self.mock = Mock(self.mocker, name="obj")
+
+ def test_create(self):
+ mock = object()
+ path = Path(mock)
+ self.assertEquals(path.root_mock, mock)
+ self.assertEquals(path.actions, ())
+
+ def test_create_with_actions(self):
+ mock = object()
+ path = Path(mock, [1,2,3])
+ self.assertEquals(path.root_mock, mock)
+ self.assertEquals(path.actions, (1,2,3))
+
+ def test_add(self):
+ mock = object()
+ path = Path(mock, [1,2,3])
+ result = path + 4
+ self.assertTrue(result is not path)
+ self.assertEquals(result.root_mock, mock)
+ self.assertEquals(result.actions, (1,2,3,4))
+
+ def test_parent_path(self):
+ path1 = Path(self.mock)
+ path2 = path1 + Action(path1, "getattr", ("attr",), {})
+ path3 = path2 + Action(path2, "getattr", ("attr",), {})
+
+ self.assertEquals(path1.parent_path, None)
+ self.assertEquals(path2.parent_path, path1)
+ self.assertEquals(path3.parent_path, path2)
+
+ def test_equals(self):
+ mock = object()
+ obj1 = object()
+ obj2 = object()
+
+ # Not the *same* mock.
+ path1 = Path([], [])
+ path2 = Path([], [])
+ self.assertNotEquals(path1, path2)
+
+ path1 = Path(mock, [Action(obj1, "kind", (), {})])
+ path2 = Path(mock, [Action(obj2, "kind", (), {})])
+ self.assertEquals(path1, path2)
+
+ path1 = Path(mock, [Action(obj1, "kind", (), {})])
+ path2 = Path(mock, [Action(obj2, "dnik", (), {})])
+ self.assertNotEquals(path1, path2)
+
+ path1 = Path(mock, [Action(obj1, "kind", (), {})])
+ path2 = Path(object(), [Action(obj2, "kind", (), {})])
+ self.assertNotEquals(path1, path2)
+
+ path1 = Path(mock, [Action(obj1, "kind", (), {})])
+ path2 = Path(mock, [Action(obj2, "kind", (1,), {})])
+ self.assertNotEquals(path1, path2)
+
+ path1 = Path(mock, [Action(obj1, "kind", (), {})])
+ path2 = Path(mock, [Action(obj2, "kind", (), {"a": 1})])
+ self.assertNotEquals(path1, path2)
+
+ path1 = Path(mock, [Action(obj1, "kind", (), {})])
+ path2 = Path(mock, [])
+ self.assertNotEquals(path1, path2)
+
+ path1 = Path(mock, [Action(obj1, "kind", (ANY,), {})])
+ path2 = Path(mock, [Action(obj2, "kind", (1), {})])
+ self.assertNotEquals(path1, path2)
+
+ path1 = Path(mock, [Action(obj1, "kind", (CONTAINS(1),), {})])
+ path2 = Path(mock, [Action(obj2, "kind", (CONTAINS(1),), {})])
+ self.assertEquals(path1, path2)
+
+ def test_matches(self):
+ mock = object()
+ obj1 = object()
+ obj2 = object()
+
+ # Not the *same* mock.
+ path1 = Path([], [])
+ path2 = Path([], [])
+ self.assertFalse(path1.matches(path2))
+
+ path1 = Path(mock, [Action(obj1, "kind", (), {})])
+ path2 = Path(mock, [Action(obj2, "kind", (), {})])
+ self.assertTrue(path1.matches(path2))
+
+ path1 = Path(mock, [Action(obj1, "kind", (), {})])
+ path2 = Path(mock, [Action(obj2, "dnik", (), {})])
+ self.assertFalse(path1.matches(path2))
+
+ path1 = Path(mock, [Action(obj1, "kind", (), {})])
+ path2 = Path(object(), [Action(obj2, "kind", (), {})])
+ self.assertFalse(path1.matches(path2))
+
+ path1 = Path(mock, [Action(obj1, "kind", (), {})])
+ path2 = Path(mock, [Action(obj2, "kind", (1,), {})])
+ self.assertFalse(path1.matches(path2))
+
+ path1 = Path(mock, [Action(obj1, "kind", (), {})])
+ path2 = Path(mock, [Action(obj2, "kind", (), {"a": 1})])
+ self.assertFalse(path1.matches(path2))
+
+ path1 = Path(mock, [Action(obj1, "kind", (), {})])
+ path2 = Path(mock, [])
+ self.assertFalse(path1.matches(path2))
+
+ path1 = Path(mock, [Action(obj1, "kind", (VARIOUS,), {})])
+ path2 = Path(mock, [Action(obj2, "kind", (), {})])
+ self.assertTrue(path1.matches(path2))
+
+ path1 = Path(mock, [Action(obj1, "kind", (VARIOUS,), {"a": 1})])
+ path2 = Path(mock, [Action(obj2, "kind", (), {})])
+ self.assertFalse(path1.matches(path2))
+
+ def test_str(self):
+ path = Path(self.mock, [])
+ self.assertEquals(str(path), "obj")
+
+ def test_str_unnamed(self):
+ mock = Mock(self.mocker)
+ path = Path(mock, [])
+ self.assertEquals(str(path), "<mock>")
+
+ def test_str_auto_named(self):
+ named_mock = Mock(self.mocker)
+ named_mock.attr
+ path = Path(named_mock, [])
+ self.assertEquals(str(path), "named_mock")
+
+ def test_str_getattr(self):
+ path = Path(self.mock, [Action(None, "getattr", ("attr",), {})])
+ self.assertEquals(str(path), "obj.attr")
+
+ path += Action(None, "getattr", ("x",), {})
+ self.assertEquals(str(path), "obj.attr.x")
+
+ def test_str_call(self):
+ path = Path(self.mock, [Action(None, "call", (), {})])
+ self.assertEquals(str(path), "obj()")
+
+ path = Path(self.mock,
+ [Action(None, "call", (1, "2"), {"a":3,"b":"4"})])
+ self.assertEquals(str(path), "obj(1, '2', a=3, b='4')")
+
+ def test_str_getattr_call(self):
+ path = Path(self.mock, [Action(None, "getattr", ("x",), {}),
+ Action(None, "getattr", ("y",), {}),
+ Action(None, "call", ("z",), {})])
+ self.assertEquals(str(path), "obj.x.y('z')")
+
+ def test_str_raise_on_unknown(self):
+ path = Path(self.mock, [Action(None, "unknown", (), {})])
+ self.assertRaises(RuntimeError, str, path)
+
+ def test_apply(self):
+ class C(object):
+ pass
+ obj = C()
+ obj.x = C()
+ obj.x.y = lambda a, b: a+b
+ path = Path(self.mock, [Action(None, "getattr", ("x",), {}),
+ Action(None, "getattr", ("y",), {}),
+ Action(None, "call", (1,), {"b": 2})])
+ self.assertEquals(path.apply(obj), 3)
+
+
+class MatchParamsTest(unittest.TestCase):
+
+ def true(self, *args):
+ self.assertTrue(match_params(*args), repr(args))
+
+ def false(self, *args):
+ self.assertFalse(match_params(*args), repr(args))
+
+ def test_any_repr(self):
+ self.assertEquals(repr(ANY), "ANY")
+
+ def test_any_equals(self):
+ self.assertEquals(ANY, 1)
+ self.assertEquals(ANY, 42)
+ self.assertEquals(ANY, ANY)
+
+ def test_various_repr(self):
+ self.assertEquals(repr(VARIOUS), "VARIOUS")
+
+ def test_various_equals(self):
+ self.assertEquals(VARIOUS, VARIOUS)
+ self.assertNotEquals(VARIOUS, ANY)
+ self.assertNotEquals(ANY, VARIOUS)
+
+ def test_same_repr(self):
+ self.assertEquals(repr(SAME("obj")), "SAME('obj')")
+
+ def test_same_equals(self):
+ l1 = []
+ l2 = []
+ self.assertEquals(SAME(l1), l1)
+ self.assertNotEquals(SAME(l1), l2)
+
+ self.assertEquals(SAME(l1), SAME(l1))
+ self.assertNotEquals(SAME(l1), SAME(l2))
+
+ self.assertNotEquals(ANY, SAME(l1))
+ self.assertNotEquals(SAME(l1), ANY)
+
+ def test_contains_repr(self):
+ self.assertEquals(repr(CONTAINS("obj")), "CONTAINS('obj')")
+
+ def test_contains_equals(self):
+ self.assertEquals(CONTAINS(1), [1])
+ self.assertNotEquals(CONTAINS([1]), [1])
+
+ self.assertEquals(CONTAINS([1]), CONTAINS([1]))
+ self.assertNotEquals(CONTAINS(1), CONTAINS([1]))
+
+ self.assertNotEquals(ANY, CONTAINS(1))
+ self.assertNotEquals(CONTAINS(1), ANY)
+
+ def test_normal(self):
+ self.assertTrue(match_params((), {}, (), {}))
+ self.assertTrue(match_params((1, 2), {"a": 3}, (1, 2), {"a": 3}))
+ self.assertFalse(match_params((1,), {}, (), {}))
+ self.assertFalse(match_params((), {}, (1,), {}))
+ self.assertFalse(match_params((1, 2), {"a": 3}, (1, 2), {"a": 4}))
+ self.assertFalse(match_params((1, 2), {"a": 3}, (1, 3), {"a": 3}))
+
+ def test_any(self):
+ self.assertTrue(match_params((1, 2), {"a": ANY}, (1, 2), {"a": 4}))
+ self.assertTrue(match_params((1, ANY), {"a": 3}, (1, 3), {"a": 3}))
+ self.assertFalse(match_params((ANY,), {}, (), {}))
+
+ def test_various_alone(self):
+ self.true((VARIOUS,), {}, (), {})
+ self.true((VARIOUS,), {}, (1, 2), {})
+ self.true((VARIOUS,), {}, (1, 2), {"a": 2})
+ self.true((VARIOUS,), {}, (), {"a": 2})
+ self.true((VARIOUS,), {"a": 1}, (), {"a": 1})
+ self.true((VARIOUS,), {"a": 1}, (1, 2), {"a": 1})
+ self.true((VARIOUS,), {"a": 1}, (), {"a": 1, "b": 2})
+ self.true((VARIOUS,), {"a": 1}, (1, 2), {"a": 1, "b": 2})
+ self.false((VARIOUS,), {"a": 1}, (), {})
+
+ def test_various_at_start(self):
+ self.true((VARIOUS, 3, 4), {}, (3, 4), {})
+ self.true((VARIOUS, 3, 4), {}, (1, 2, 3, 4), {})
+ self.true((VARIOUS, 3, 4), {"a": 1}, (3, 4), {"a": 1})
+ self.true((VARIOUS, 3, 4), {"a": 1}, (1, 2, 3, 4), {"a": 1, "b": 2})
+ self.false((VARIOUS, 3, 4), {}, (), {})
+ self.false((VARIOUS, 3, 4), {}, (3, 5), {})
+ self.false((VARIOUS, 3, 4), {}, (5, 5), {})
+ self.false((VARIOUS, 3, 4), {"a": 1}, (), {})
+ self.false((VARIOUS, 3, 4), {"a": 1}, (3, 4), {})
+ self.false((VARIOUS, 3, 4), {"a": 1}, (3, 4), {"b": 2})
+
+ def test_various_at_end(self):
+ self.true((1, 2, VARIOUS), {}, (1, 2), {})
+ self.true((1, 2, VARIOUS), {}, (1, 2, 3, 4), {})
+ self.true((1, 2, VARIOUS), {"a": 1}, (1, 2), {"a": 1})
+ self.true((1, 2, VARIOUS), {"a": 1}, (1, 2, 3, 4), {"a": 1, "b": 2})
+ self.false((1, 2, VARIOUS), {}, (), {})
+ self.false((1, 2, VARIOUS), {}, (1, 3), {})
+ self.false((1, 2, VARIOUS), {}, (3, 3), {})
+ self.false((1, 2, VARIOUS), {"a": 1}, (), {})
+ self.false((1, 2, VARIOUS), {"a": 1}, (1, 2), {})
+ self.false((1, 2, VARIOUS), {"a": 1}, (1, 2), {"b": 2})
+
+ def test_various_at_middle(self):
+ self.true((1, VARIOUS, 4), {}, (1, 4), {})
+ self.true((1, VARIOUS, 4), {}, (1, 2, 3, 4), {})
+ self.true((1, VARIOUS, 4), {"a": 1}, (1, 4), {"a": 1})
+ self.true((1, VARIOUS, 4), {"a": 1}, (1, 2, 3, 4), {"a": 1, "b": 2})
+ self.false((1, VARIOUS, 4), {}, (), {})
+ self.false((1, VARIOUS, 4), {}, (1, 5), {})
+ self.false((1, VARIOUS, 4), {}, (5, 5), {})
+ self.false((1, VARIOUS, 4), {"a": 1}, (), {})
+ self.false((1, VARIOUS, 4), {"a": 1}, (1, 4), {})
+ self.false((1, VARIOUS, 4), {"a": 1}, (1, 4), {"b": 2})
+
+ def test_various_multiple(self):
+ self.true((VARIOUS, 3, VARIOUS, 6, VARIOUS), {},
+ (1, 2, 3, 4, 5, 6), {})
+ self.true((VARIOUS, VARIOUS, VARIOUS), {}, (1, 2, 3, 4, 5, 6), {})
+ self.true((VARIOUS, VARIOUS, VARIOUS), {}, (), {})
+ self.false((VARIOUS, 3, VARIOUS, 6, VARIOUS), {},
+ (1, 2, 3, 4, 5), {})
+ self.false((VARIOUS, 3, VARIOUS, 6, VARIOUS), {},
+ (1, 2, 4, 5, 6), {})
+
+
+class MockTest(unittest.TestCase):
+
+ def setUp(self):
+ self.paths = []
+ class StubMocker(object):
+ @staticmethod
+ def act(path):
+ self.paths.append(path)
+ return 42
+ self.mocker = StubMocker()
+ self.obj = Mock(self.mocker)
+
+ def test_mocker(self):
+ self.assertEquals(self.obj.__mocker__, self.mocker)
+ self.assertEquals(self.obj.__mocker_name__, None)
+
+ def test_default_path(self):
+ path = self.obj.__mocker_path__
+ self.assertEquals(path.root_mock, self.obj)
+ self.assertEquals(path.actions, ())
+
+ def test_path(self):
+ path = object()
+ self.assertEquals(Mock(self.mocker, path).__mocker_path__, path)
+
+ def test_object(self):
+ obj = Mock(self.mocker, object="foo")
+ self.assertEquals(obj.__mocker_object__, "foo")
+
+ def test_passthrough(self):
+ obj = Mock(self.mocker, object="foo", passthrough=True)
+ self.assertEquals(obj.__mocker_object__, "foo")
+ self.assertEquals(obj.__mocker_passthrough__, True)
+
+ def test_auto_naming(self):
+ named_obj = self.obj
+ named_obj.attr
+ another_name = named_obj
+ named_obj = None # Can't find this one anymore.
+ another_name.attr
+ self.assertEquals(another_name.__mocker_name__, "named_obj")
+
+ def test_auto_naming_on_self(self):
+ self.named_obj = self.obj
+ del self.obj
+ self.named_obj.attr
+ self.assertEquals(self.named_obj.__mocker_name__, "named_obj")
+
+ def test_auto_naming_on_bad_self(self):
+ self_ = self
+ self = object() # No __dict__
+ self_.named_obj = self_.obj
+ self_.named_obj.attr
+ self_.assertEquals(self_.named_obj.__mocker_name__, None)
+
+ def test_auto_naming_without_getframe(self):
+ getframe = sys._getframe
+ sys._getframe = None
+ try:
+ self.named_obj = self.obj
+ self.named_obj.attr
+ self.assertEquals(self.named_obj.__mocker_name__, None)
+ finally:
+ sys._getframe = getframe
+
+ def test_getattr(self):
+ self.assertEquals(self.obj.attr, 42)
+ (path,) = self.paths
+ self.assertEquals(type(path), Path)
+ self.assertTrue(path.parent_path is self.obj.__mocker_path__)
+ self.assertEquals(path, self.obj.__mocker_path__ +
+ Action(None, "getattr", ("attr",), {}))
+
+ def test_call(self):
+ self.obj(1, a=2)
+ (path,) = self.paths
+ self.assertEquals(type(path), Path)
+ self.assertTrue(path.parent_path is self.obj.__mocker_path__)
+ self.assertEquals(path, self.obj.__mocker_path__ +
+ Action(None, "call", (1,), {"a": 2}))
+
+ def test_passthrough_on_unexpected(self):
+ class StubMocker(object):
+ def act(self, path):
+ if path.actions[-1].args == ("x",):
+ raise UnexpectedExprError
+ return 42
+ class C(object):
+ x = 123
+ y = 321
+
+ obj = Mock(StubMocker(), object=C())
+ self.assertRaises(UnexpectedExprError, getattr, obj, "x", 42)
+ self.assertEquals(obj.y, 42)
+
+ obj = Mock(StubMocker(), object=C(), passthrough=True)
+ self.assertEquals(obj.x, 123)
+ self.assertEquals(obj.y, 42)
+
+
+class EventTest(unittest.TestCase):
+
+ def setUp(self):
+ self.event = Event()
+
+ def test_default_path(self):
+ self.assertEquals(self.event.path, None)
+
+ def test_path(self):
+ path = object()
+ event = Event(path)
+ self.assertEquals(event.path, path)
+
+ def test_add_and_get_tasks(self):
+ task1 = self.event.add_task(Task())
+ task2 = self.event.add_task(Task())
+ self.assertEquals(self.event.get_tasks(), [task1, task2])
+
+ def test_remove_task(self):
+ task1 = self.event.add_task(Task())
+ task2 = self.event.add_task(Task())
+ task3 = self.event.add_task(Task())
+ self.event.remove_task(task2)
+ self.assertEquals(self.event.get_tasks(), [task1, task3])
+
+ def test_default_matches(self):
+ self.assertEquals(self.event.matches(None), False)
+
+ def test_default_run(self):
+ self.assertEquals(self.event.run(None), None)
+
+ def test_default_satisfied(self):
+ self.assertEquals(self.event.satisfied(), True)
+
+ def test_default_verify(self):
+ self.assertEquals(self.event.verify(), None)
+
+ def test_default_restore(self):
+ self.assertEquals(self.event.set_state(None), None)
+
+ def test_matches_false(self):
+ task1 = self.event.add_task(Task())
+ task1.matches = lambda path: True
+ task2 = self.event.add_task(Task())
+ task2.matches = lambda path: False
+ task3 = self.event.add_task(Task())
+ task3.matches = lambda path: True
+ self.assertEquals(self.event.matches(None), False)
+
+ def test_matches_true(self):
+ task1 = self.event.add_task(Task())
+ task1.matches = lambda path: True
+ task2 = self.event.add_task(Task())
+ task2.matches = lambda path: True
+ self.assertEquals(self.event.matches(None), True)
+
+ def test_matches_argument(self):
+ calls = []
+ task = self.event.add_task(Task())
+ task.matches = lambda path: calls.append(path)
+ self.event.matches(42)
+ self.assertEquals(calls, [42])
+
+ def test_run(self):
+ calls = []
+ task1 = self.event.add_task(Task())
+ task1.run = lambda path: calls.append(path) or True
+ task2 = self.event.add_task(Task())
+ task2.run = lambda path: calls.append(path) or False
+ task3 = self.event.add_task(Task())
+ task3.run = lambda path: calls.append(path) or None
+ self.assertEquals(self.event.run(42), False)
+ self.assertEquals(calls, [42, 42, 42])
+
+ def test_satisfied_false(self):
+ def raise_error():
+ raise AssertionError
+ task1 = self.event.add_task(Task())
+ task2 = self.event.add_task(Task())
+ task2.verify = raise_error
+ task3 = self.event.add_task(Task())
+ self.assertEquals(self.event.satisfied(), False)
+
+ def test_satisfied_true(self):
+ task1 = self.event.add_task(Task())
+ task1.satisfied = lambda: True
+ task2 = self.event.add_task(Task())
+ task2.satisfied = lambda: True
+ self.assertEquals(self.event.satisfied(), True)
+
+ def test_verify(self):
+ calls = []
+ task1 = self.event.add_task(Task())
+ task1.verify = lambda: calls.append(1)
+ task2 = self.event.add_task(Task())
+ task2.verify = lambda: calls.append(2)
+ self.event.verify()
+ self.assertEquals(calls, [1, 2])
+
+ def test_set_state(self):
+ calls = []
+ task1 = self.event.add_task(Task())
+ task1.set_state = lambda state: calls.append(state+1)
+ task2 = self.event.add_task(Task())
+ task2.set_state = lambda state: calls.append(state+2)
+ self.event.set_state(3)
+ self.assertEquals(calls, [4, 5])
+
+
+class TaskTest(unittest.TestCase):
+
+ def setUp(self):
+ self.task = Task()
+
+ def test_default_matches(self):
+ self.assertEquals(self.task.matches(None), True)
+
+ def test_default_run(self):
+ self.assertEquals(self.task.run(None), None)
+
+ def test_default_verify(self):
+ self.assertEquals(self.task.verify(), None)
+
+ def test_default_set_state(self):
+ self.assertEquals(self.task.set_state(None), None)
+
+
+class PathMatcherTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mocker = CleanMocker()
+ self.mock = self.mocker.obj()
+
+ def test_is_task(self):
+ self.assertTrue(isinstance(PathMatcher(None), Task))
+
+ def test_create(self):
+ path = object()
+ task = PathMatcher(path)
+ self.assertEquals(task.path, path)
+
+ def test_matches(self):
+ path = Path(self.mock, [Action(None, "getattr", ("attr1",), {})])
+ task = PathMatcher(path)
+ action = Action(Path(self.mock), "getattr", (), {})
+ self.assertFalse(task.matches(action.path + action))
+ action = Action(Path(self.mock), "getattr", ("attr1",), {})
+ self.assertTrue(task.matches(action.path + action))
+
+ def test_recorder(self):
+ path = Path(self.mock, [Action(None, "call", (), {})])
+ event = Event(path)
+ path_matcher_recorder(self.mocker, event)
+ (task,) = event.get_tasks()
+ self.assertEquals(type(task), PathMatcher)
+ self.assertTrue(task.path is path)
+
+ def test_is_standard_recorder(self):
+ self.assertTrue(path_matcher_recorder in Mocker.get_recorders())
+
+
+class RunCounterTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mocker = CleanMocker()
+ self.mock = self.mocker.obj()
+ self.action = Action(Path(self.mock), "getattr", ("attr",), {})
+ self.path = Path(self.mock, [self.action])
+ self.event = Event(self.path)
+
+ def test_is_task(self):
+ self.assertTrue(isinstance(RunCounter(1), Task))
+
+ def test_create_one_argument(self):
+ task = RunCounter(2)
+ self.assertEquals(task.min, 2)
+ self.assertEquals(task.max, 2)
+
+ def test_create_min_max(self):
+ task = RunCounter(2, 3)
+ self.assertEquals(task.min, 2)
+ self.assertEquals(task.max, 3)
+
+ def test_create_unbounded(self):
+ task = RunCounter(2, None)
+ self.assertEquals(task.min, 2)
+ self.assertEquals(task.max, sys.maxint)
+
+ def test_run_one_argument(self):
+ task = RunCounter(2)
+ task.run(self.path)
+ task.run(self.path)
+ self.assertRaises(AssertionError, task.run, self.path)
+
+ def test_run_two_arguments(self):
+ task = RunCounter(1, 2)
+ task.run(self.path)
+ task.run(self.path)
+ self.assertRaises(AssertionError, task.run, self.path)
+
+ def test_verify(self):
+ task = RunCounter(2)
+ self.assertRaises(AssertionError, task.verify)
+ task.run(self.path)
+ self.assertRaises(AssertionError, task.verify)
+ task.run(self.path)
+ task.verify()
+ self.assertRaises(AssertionError, task.run, self.path)
+ self.assertRaises(AssertionError, task.verify)
+
+ def test_verify_two_arguments(self):
+ task = RunCounter(1, 2)
+ self.assertRaises(AssertionError, task.verify)
+ task.run(self.path)
+ task.verify()
+ task.run(self.path)
+ task.verify()
+ self.assertRaises(AssertionError, task.run, self.path)
+ self.assertRaises(AssertionError, task.verify)
+
+ def test_verify_unbound(self):
+ task = RunCounter(1, None)
+ self.assertRaises(AssertionError, task.verify)
+ task.run(self.path)
+ task.verify()
+ task.run(self.path)
+ task.verify()
+
+ def test_recorder(self):
+ run_counter_recorder(self.mocker, self.event)
+ (task,) = self.event.get_tasks()
+ self.assertEquals(type(task), ImplicitRunCounter)
+ self.assertTrue(task.min == 1)
+ self.assertTrue(task.max == 1)
+
+ def test_removal_recorder(self):
+ """
+ Events created by getattr actions which lead to other events
+ may be repeated any number of times.
+ """
+ path1 = Path(self.mock)
+ path2 = path1 + Action(path1, "getattr", ("attr",), {})
+ path3 = path2 + Action(path2, "getattr", ("attr",), {})
+ path4 = path3 + Action(path3, "call", (), {})
+ path5 = path4 + Action(path4, "call", (), {})
+
+ event3 = self.mocker.add_event(Event(path3))
+ event2 = self.mocker.add_event(Event(path2))
+ event5 = self.mocker.add_event(Event(path5))
+ event4 = self.mocker.add_event(Event(path4))
+
+ event2.add_task(RunCounter(1))
+ event2.add_task(ImplicitRunCounter(1))
+ event2.add_task(RunCounter(1))
+ event3.add_task(RunCounter(1))
+ event3.add_task(ImplicitRunCounter(1))
+ event3.add_task(RunCounter(1))
+ event4.add_task(RunCounter(1))
+ event4.add_task(ImplicitRunCounter(1))
+ event4.add_task(RunCounter(1))
+ event5.add_task(RunCounter(1))
+ event5.add_task(ImplicitRunCounter(1))
+ event5.add_task(RunCounter(1))
+
+ # First, when the previous event isn't a getattr.
+
+ run_counter_removal_recorder(self.mocker, event5)
+
+ self.assertEquals(len(event2.get_tasks()), 3)
+ self.assertEquals(len(event3.get_tasks()), 3)
+ self.assertEquals(len(event4.get_tasks()), 3)
+ self.assertEquals(len(event5.get_tasks()), 3)
+
+ # Now, for real.
+
+ run_counter_removal_recorder(self.mocker, event4)
+
+ self.assertEquals(len(event2.get_tasks()), 3)
+ self.assertEquals(len(event3.get_tasks()), 2)
+ self.assertEquals(len(event4.get_tasks()), 3)
+ self.assertEquals(len(event5.get_tasks()), 3)
+
+ task1, task2 = event3.get_tasks()
+ self.assertEquals(type(task1), RunCounter)
+ self.assertEquals(type(task2), RunCounter)
+
+ def test_removal_recorder_with_obj(self):
+
+ self.mocker.add_recorder(run_counter_recorder)
+ self.mocker.add_recorder(run_counter_removal_recorder)
+
+ obj = self.mocker.obj()
+
+ obj.x.y()()
+
+ events = self.mocker.get_events()
+ self.assertEquals(len(events), 4)
+ self.assertEquals(len(events[0].get_tasks()), 0)
+ self.assertEquals(len(events[1].get_tasks()), 0)
+ self.assertEquals(len(events[2].get_tasks()), 1)
+ self.assertEquals(len(events[3].get_tasks()), 1)
+
+ def test_is_standard_recorder(self):
+ self.assertTrue(run_counter_recorder in Mocker.get_recorders())
+ self.assertTrue(run_counter_removal_recorder in Mocker.get_recorders())
+
+
+class MockReturnerTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mocker = CleanMocker()
+ self.mock = self.mocker.obj()
+ self.action = Action(Path(self.mock), "getattr", ("attr",), {})
+ self.path = Path(self.mock, [self.action])
+ self.event = Event(self.path)
+
+ def test_is_task(self):
+ self.assertTrue(isinstance(MockReturner(self.mocker), Task))
+
+ def test_create(self):
+ task = MockReturner(self.mocker)
+ mock = task.run(self.path)
+ self.assertTrue(isinstance(mock, Mock))
+ self.assertEquals(mock.__mocker__, self.mocker)
+ self.assertTrue(mock.__mocker_path__.matches(self.path))
+
+ def test_recorder(self):
+ path1 = Path(self.mock)
+ path2 = path1 + Action(path1, "getattr", ("attr",), {})
+ path3 = path2 + Action(path2, "getattr", ("attr",), {})
+ path4 = path3 + Action(path3, "call", (), {})
+
+ event2 = self.mocker.add_event(Event(path2))
+ event3 = self.mocker.add_event(Event(path3))
+ event4 = self.mocker.add_event(Event(path4))
+
+ self.assertEquals(len(event2.get_tasks()), 0)
+ self.assertEquals(len(event3.get_tasks()), 0)
+ self.assertEquals(len(event4.get_tasks()), 0)
+
+ # Calling on 4 should add it only to the parent.
+
+ mock_returner_recorder(self.mocker, event4)
+
+ self.assertEquals(len(event2.get_tasks()), 0)
+ self.assertEquals(len(event3.get_tasks()), 1)
+ self.assertEquals(len(event4.get_tasks()), 0)
+
+ (task,) = event3.get_tasks()
+ self.assertEquals(type(task), MockReturner)
+ self.assertEquals(task.mocker, self.mocker)
+
+ # Calling on it again shouldn't do anything.
+
+ mock_returner_recorder(self.mocker, event4)
+
+ self.assertEquals(len(event2.get_tasks()), 0)
+ self.assertEquals(len(event3.get_tasks()), 1)
+ self.assertEquals(len(event4.get_tasks()), 0)
+
+ def test_is_standard_recorder(self):
+ self.assertTrue(mock_returner_recorder in Mocker.get_recorders())
+
+
+class FunctionRunnerTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mocker = CleanMocker()
+ self.mock = self.mocker.obj()
+ self.action = Action(Path(self.mock), "call", (1, 2), {"c": 3})
+ self.path = Path(self.mock, [self.action])
+ self.event = Event(self.path)
+
+ def test_is_task(self):
+ self.assertTrue(isinstance(FunctionRunner(None), Task))
+
+ def test_run(self):
+ task = FunctionRunner(lambda *args, **kwargs: repr((args, kwargs)))
+ result = task.run(self.path)
+ self.assertEquals(result, "((1, 2), {'c': 3})")
+
+
+class PathApplierTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mocker = CleanMocker()
+
+ def test_is_task(self):
+ self.assertTrue(isinstance(PathApplier(), Task))
+
+ def test_run(self):
+ class C(object):
+ pass
+ obj = C()
+ obj.x = C()
+ obj.x.y = lambda a, b: a+b
+
+ proxy = self.mocker.proxy(obj)
+
+ path = Path(proxy, [Action(None, "getattr", ("x",), {}),
+ Action(None, "getattr", ("y",), {}),
+ Action(None, "call", (1,), {"b": 2})])
+
+ task = PathApplier()
+ self.assertEquals(task.run(path), 3)
+
+
+class OrdererTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mocker = CleanMocker()
+ self.mock = self.mocker.obj()
+ self.action = Action(Path(self.mock), "call", (1, 2), {"c": 3})
+ self.path = Path(self.mock, [self.action])
+
+ def test_is_task(self):
+ self.assertTrue(isinstance(Orderer(), Task))
+
+ def test_has_run(self):
+ orderer = Orderer()
+ self.assertFalse(orderer.has_run())
+ orderer.run(self.path)
+ self.assertTrue(orderer.has_run())
+
+ def test_add_dependency_and_match(self):
+ orderer1 = Orderer()
+ orderer2 = Orderer()
+ orderer2.add_dependency(orderer1)
+ self.assertFalse(orderer2.matches(None))
+ self.assertTrue(orderer1.matches(None))
+ orderer1.run(self.path)
+ self.assertTrue(orderer2.matches(None))
+
+ def test_get_dependencies(self):
+ orderer = Orderer()
+ orderer.add_dependency(1)
+ orderer.add_dependency(2)
+ self.assertEquals(orderer.get_dependencies(), [1, 2])
+
+
+class SpecCheckerTest(unittest.TestCase):
+
+ def setUp(self):
+ class C(object):
+ def normal(self, a, b, c=3): pass
+ def varargs(self, a, b, c=3, *args): pass
+ def varkwargs(self, a, b, c=3, **kwargs): pass
+ def varargskwargs(self, a, b, c=3, *args, **kwargs): pass
+ @classmethod
+ def klass(cls, a, b, c=3): pass
+ @staticmethod
+ def static(a, b, c=3): pass
+ def noargs(self): pass
+ @classmethod
+ def klassnoargs(cls): pass
+ @staticmethod
+ def staticnoargs(): pass
+ self.cls = C
+ self.mocker = CleanMocker()
+ self.mock = self.mocker.obj(self.cls)
+
+ def path(self, *args, **kwargs):
+ action = Action(Path(self.mock), "call", args, kwargs)
+ return action.path + action
+
+ def good(self, method_names, args_expr):
+ if type(method_names) is not list:
+ method_names = [method_names]
+ for method_name in method_names:
+ task = SpecChecker(getattr(self.cls, method_name, None))
+ path = eval("self.path(%s)" % args_expr)
+ try:
+ task.run(path)
+ except AssertionError:
+ self.fail("AssertionError raised with self.cls.%s(%s)"
+ % (method_name, args_expr))
+
+ def bad(self, method_names, args_expr):
+ if type(method_names) is not list:
+ method_names = [method_names]
+ for method_name in method_names:
+ task = SpecChecker(getattr(self.cls, method_name, None))
+ path = eval("self.path(%s)" % args_expr)
+ try:
+ task.run(path)
+ except AssertionError:
+ pass
+ else:
+ self.fail("AssertionError not raised with self.cls.%s(%s)"
+ % (method_name, args_expr))
+
+ def test_get_method(self):
+ task = SpecChecker(self.cls.noargs)
+ self.assertEquals(task.get_method(), self.cls.noargs)
+
+ def test_is_standard_recorder(self):
+ self.assertTrue(spec_checker_recorder in Mocker.get_recorders())
+
+ def test_is_task(self):
+ self.assertTrue(isinstance(SpecChecker(self.cls.normal), Task))
+
+ def test_recorder(self):
+ self.mocker.add_recorder(spec_checker_recorder)
+ obj = self.mocker.obj(self.cls)
+ obj.noargs()
+ getattr, call = self.mocker.get_events()
+ self.assertEquals(getattr.get_tasks(), [])
+ (task,) = call.get_tasks()
+ self.assertEquals(type(task), SpecChecker)
+ self.assertEquals(task.get_method(), self.cls.noargs)
+
+ def test_recorder_with_unexistent_method(self):
+ self.mocker.add_recorder(spec_checker_recorder)
+ obj = self.mocker.obj(self.cls)
+ obj.unexistent()
+ getattr, call = self.mocker.get_events()
+ self.assertEquals(getattr.get_tasks(), [])
+ (task,) = call.get_tasks()
+ self.assertEquals(type(task), SpecChecker)
+ self.assertEquals(task.get_method(), None)
+
+ def test_recorder_second_action_isnt_call(self):
+ self.mocker.add_recorder(spec_checker_recorder)
+ obj = self.mocker.obj(self.cls)
+ obj.noargs.x
+ event1, event2 = self.mocker.get_events()
+ self.assertEquals(event1.get_tasks(), [])
+ self.assertEquals(event2.get_tasks(), [])
+
+ def test_recorder_first_action_isnt_getattr(self):
+ self.mocker.add_recorder(spec_checker_recorder)
+ obj = self.mocker.obj(self.cls)
+ obj("noargs").x
+ event1, event2 = self.mocker.get_events()
+ self.assertEquals(event1.get_tasks(), [])
+ self.assertEquals(event2.get_tasks(), [])
+
+ def test_recorder_more_than_two_actions(self):
+ self.mocker.add_recorder(spec_checker_recorder)
+ obj = self.mocker.obj(self.cls)
+ obj.noargs().x
+ event1, event2, event3 = self.mocker.get_events()
+ self.assertEquals(len(event1.get_tasks()), 0)
+ self.assertEquals(len(event2.get_tasks()), 1)
+ self.assertEquals(len(event3.get_tasks()), 0)
+
+ def test_noargs(self):
+ methods = ["noargs", "klassnoargs", "staticnoargs"]
+ self.good(methods, "")
+ self.bad(methods, "1")
+ self.bad(methods, "a=1")
+
+ def test_args_and_kwargs(self):
+ methods = ["normal", "varargs", "varkwargs", "varargskwargs",
+ "static", "klass"]
+ self.good(methods, "1, 2")
+ self.good(methods, "1, 2, 3")
+ self.good(methods, "1, b=2")
+ self.good(methods, "1, b=2, c=3")
+ self.good(methods, "a=1, b=2")
+ self.good(methods, "a=1, b=2, c=3")
+
+ def test_too_much(self):
+ methods = ["normal", "static", "klass"]
+ self.bad(methods, "1, 2, 3, 4")
+ self.bad(methods, "1, 2, d=4")
+
+ def test_missing(self):
+ methods = ["normal", "varargs", "varkwargs", "varargskwargs",
+ "static", "klass"]
+ self.bad(methods, "")
+ self.bad(methods, "1")
+ self.bad(methods, "c=3")
+ self.bad(methods, "a=1")
+ self.bad(methods, "b=2, c=3")
+
+ def test_duplicated_argument(self):
+ methods = ["normal", "varargs", "varkwargs", "varargskwargs",
+ "static", "klass"]
+ self.bad(methods, "1, 2, b=2")
+
+ def test_varargs(self):
+ self.good("varargs", "1, 2, 3, 4")
+ self.bad("varargs", "1, 2, 3, 4, d=3")
+
+ def test_varkwargs(self):
+ self.good("varkwargs", "1, 2, d=3")
+ self.bad("varkwargs", "1, 2, 3, 4, d=3")
+
+ def test_varargskwargs(self):
+ self.good("varargskwargs", "1, 2, 3, 4, d=3")
+
+ def test_unexistent(self):
+ self.bad("unexistent", "")
+
+
+class ProxyInstallerTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mocker = CleanMocker()
+ import calendar
+ self.mock = Mock(self.mocker, object=calendar)
+ self.task = ProxyInstaller(self.mock)
+
+ def tearDown(self):
+ self.task.set_state(RESTORE)
+
+ def test_is_task(self):
+ self.assertTrue(isinstance(ProxyInstaller(None), Task))
+
+ def test_mock(self):
+ mock = object()
+ task = ProxyInstaller(mock)
+ self.assertEquals(task.mock, mock)
+
+ def test_matches_nothing(self):
+ self.assertFalse(self.task.matches(None))
+
+ def test_defaults_to_not_installed(self):
+ import calendar
+ self.assertEquals(type(calendar), ModuleType)
+
+ def test_install(self):
+ self.task.set_state(REPLAY)
+ import calendar
+ self.assertEquals(type(calendar), Mock)
+ self.assertTrue(calendar is self.mock)
+
+ def test_install_protects_mock(self):
+ self.task.set_state(REPLAY)
+ self.assertEquals(type(self.mock.__mocker_object__), ModuleType)
+
+ def test_deinstall_protects_task(self):
+ self.task.set_state(REPLAY)
+ self.task.set_state(RESTORE)
+ self.assertEquals(type(self.task.mock), Mock)
+
+ def test_install_on_object(self):
+ class C(object):
+ def __init__(self):
+ import calendar
+ self.calendar = calendar
+ obj = C()
+ self.task.set_state(REPLAY)
+ self.assertEquals(type(obj.calendar), Mock)
+ self.assertTrue(obj.calendar is self.mock)
+
+ def test_install_on_submodule(self):
+ from os import path
+ mock = Mock(self.mocker, object=path)
+ task = ProxyInstaller(mock)
+ task.set_state(REPLAY)
+ import os
+ self.assertEquals(type(os.path), Mock)
+ self.assertTrue(os.path is mock)
+
+ def test_deinstall_on_restore(self):
+ self.task.set_state(REPLAY)
+ self.task.set_state(RESTORE)
+ import calendar
+ self.assertEquals(type(calendar), ModuleType)
+ self.assertEquals(calendar.__name__, "calendar")
+
+ def test_deinstall_on_record(self):
+ self.task.set_state(REPLAY)
+ self.task.set_state(RECORD)
+ import calendar
+ self.assertEquals(type(calendar), ModuleType)
+ self.assertEquals(calendar.__name__, "calendar")
+
+ def test_deinstall_from_object(self):
+ class C(object):
+ def __init__(self):
+ import calendar
+ self.calendar = calendar
+ obj = C()
+ self.task.set_state(REPLAY)
+ self.task.set_state(RESTORE)
+ self.assertEquals(type(obj.calendar), ModuleType)
+ self.assertEquals(obj.calendar.__name__, "calendar")
+
+ def test_deinstall_from_submodule(self):
+ from os import path
+ mock = Mock(self.mocker, object=path)
+ task = ProxyInstaller(mock)
+ task.set_state(REPLAY)
+ task.set_state(RESTORE)
+ import os
+ self.assertEquals(type(os.path), ModuleType)
+
+
+if __name__ == "__main__":
+ unittest.main()