# -*- coding: utf-8 -*- """ Test suite for PEP 380 implementation adapted from original tests written by Greg Ewing see """ import unittest import inspect from test.support import captured_stderr, disable_gc, gc_collect class TestPEP380Operation(unittest.TestCase): """ Test semantics. """ def test_delegation_of_initial_next_to_subgenerator(self): """ Test delegation of initial next() call to subgenerator """ trace = [] def g1(): trace.append("Starting g1") yield from g2() trace.append("Finishing g1") def g2(): trace.append("Starting g2") yield 42 trace.append("Finishing g2") for x in g1(): trace.append("Yielded %s" % (x,)) self.assertEqual(trace,[ "Starting g1", "Starting g2", "Yielded 42", "Finishing g2", "Finishing g1", ]) def test_raising_exception_in_initial_next_call(self): """ Test raising exception in initial next() call """ trace = [] def g1(): try: trace.append("Starting g1") yield from g2() finally: trace.append("Finishing g1") def g2(): try: trace.append("Starting g2") raise ValueError("spanish inquisition occurred") finally: trace.append("Finishing g2") try: for x in g1(): trace.append("Yielded %s" % (x,)) except ValueError as e: self.assertEqual(e.args[0], "spanish inquisition occurred") else: self.fail("subgenerator failed to raise ValueError") self.assertEqual(trace,[ "Starting g1", "Starting g2", "Finishing g2", "Finishing g1", ]) def test_delegation_of_next_call_to_subgenerator(self): """ Test delegation of next() call to subgenerator """ trace = [] def g1(): trace.append("Starting g1") yield "g1 ham" yield from g2() yield "g1 eggs" trace.append("Finishing g1") def g2(): trace.append("Starting g2") yield "g2 spam" yield "g2 more spam" trace.append("Finishing g2") for x in g1(): trace.append("Yielded %s" % (x,)) self.assertEqual(trace,[ "Starting g1", "Yielded g1 ham", "Starting g2", "Yielded g2 spam", "Yielded g2 more spam", "Finishing g2", "Yielded g1 eggs", "Finishing g1", ]) def test_raising_exception_in_delegated_next_call(self): """ Test raising exception in delegated next() call """ trace = [] def g1(): try: trace.append("Starting g1") yield "g1 ham" yield from g2() yield "g1 eggs" finally: trace.append("Finishing g1") def g2(): try: trace.append("Starting g2") yield "g2 spam" raise ValueError("hovercraft is full of eels") yield "g2 more spam" finally: trace.append("Finishing g2") try: for x in g1(): trace.append("Yielded %s" % (x,)) except ValueError as e: self.assertEqual(e.args[0], "hovercraft is full of eels") else: self.fail("subgenerator failed to raise ValueError") self.assertEqual(trace,[ "Starting g1", "Yielded g1 ham", "Starting g2", "Yielded g2 spam", "Finishing g2", "Finishing g1", ]) def test_delegation_of_send(self): """ Test delegation of send() """ trace = [] def g1(): trace.append("Starting g1") x = yield "g1 ham" trace.append("g1 received %s" % (x,)) yield from g2() x = yield "g1 eggs" trace.append("g1 received %s" % (x,)) trace.append("Finishing g1") def g2(): trace.append("Starting g2") x = yield "g2 spam" trace.append("g2 received %s" % (x,)) x = yield "g2 more spam" trace.append("g2 received %s" % (x,)) trace.append("Finishing g2") g = g1() y = next(g) x = 1 try: while 1: y = g.send(x) trace.append("Yielded %s" % (y,)) x += 1 except StopIteration: pass self.assertEqual(trace,[ "Starting g1", "g1 received 1", "Starting g2", "Yielded g2 spam", "g2 received 2", "Yielded g2 more spam", "g2 received 3", "Finishing g2", "Yielded g1 eggs", "g1 received 4", "Finishing g1", ]) def test_handling_exception_while_delegating_send(self): """ Test handling exception while delegating 'send' """ trace = [] def g1(): trace.append("Starting g1") x = yield "g1 ham" trace.append("g1 received %s" % (x,)) yield from g2() x = yield "g1 eggs" trace.append("g1 received %s" % (x,)) trace.append("Finishing g1") def g2(): trace.append("Starting g2") x = yield "g2 spam" trace.append("g2 received %s" % (x,)) raise ValueError("hovercraft is full of eels") x = yield "g2 more spam" trace.append("g2 received %s" % (x,)) trace.append("Finishing g2") def run(): g = g1() y = next(g) x = 1 try: while 1: y = g.send(x) trace.append("Yielded %s" % (y,)) x += 1 except StopIteration: trace.append("StopIteration") self.assertRaises(ValueError,run) self.assertEqual(trace,[ "Starting g1", "g1 received 1", "Starting g2", "Yielded g2 spam", "g2 received 2", ]) def test_delegating_close(self): """ Test delegating 'close' """ trace = [] def g1(): try: trace.append("Starting g1") yield "g1 ham" yield from g2() yield "g1 eggs" finally: trace.append("Finishing g1") def g2(): try: trace.append("Starting g2") yield "g2 spam" yield "g2 more spam" finally: trace.append("Finishing g2") g = g1() for i in range(2): x = next(g) trace.append("Yielded %s" % (x,)) g.close() self.assertEqual(trace,[ "Starting g1", "Yielded g1 ham", "Starting g2", "Yielded g2 spam", "Finishing g2", "Finishing g1" ]) def test_handing_exception_while_delegating_close(self): """ Test handling exception while delegating 'close' """ trace = [] def g1(): try: trace.append("Starting g1") yield "g1 ham" yield from g2() yield "g1 eggs" finally: trace.append("Finishing g1") def g2(): try: trace.append("Starting g2") yield "g2 spam" yield "g2 more spam" finally: trace.append("Finishing g2") raise ValueError("nybbles have exploded with delight") try: g = g1() for i in range(2): x = next(g) trace.append("Yielded %s" % (x,)) g.close() except ValueError as e: self.assertEqual(e.args[0], "nybbles have exploded with delight") self.assertIsInstance(e.__context__, GeneratorExit) else: self.fail("subgenerator failed to raise ValueError") self.assertEqual(trace,[ "Starting g1", "Yielded g1 ham", "Starting g2", "Yielded g2 spam", "Finishing g2", "Finishing g1", ]) def test_delegating_throw(self): """ Test delegating 'throw' """ trace = [] def g1(): try: trace.append("Starting g1") yield "g1 ham" yield from g2() yield "g1 eggs" finally: trace.append("Finishing g1") def g2(): try: trace.append("Starting g2") yield "g2 spam" yield "g2 more spam" finally: trace.append("Finishing g2") try: g = g1() for i in range(2): x = next(g) trace.append("Yielded %s" % (x,)) e = ValueError("tomato ejected") g.throw(e) except ValueError as e: self.assertEqual(e.args[0], "tomato ejected") else: self.fail("subgenerator failed to raise ValueError") self.assertEqual(trace,[ "Starting g1", "Yielded g1 ham", "Starting g2", "Yielded g2 spam", "Finishing g2", "Finishing g1", ]) def test_value_attribute_of_StopIteration_exception(self): """ Test 'value' attribute of StopIteration exception """ trace = [] def pex(e): trace.append("%s: %s" % (e.__class__.__name__, e)) trace.append("value = %s" % (e.value,)) e = StopIteration() pex(e) e = StopIteration("spam") pex(e) e.value = "eggs" pex(e) self.assertEqual(trace,[ "StopIteration: ", "value = None", "StopIteration: spam", "value = spam", "StopIteration: spam", "value = eggs", ]) def test_exception_value_crash(self): # There used to be a refcount error when the return value # stored in the StopIteration has a refcount of 1. def g1(): yield from g2() def g2(): yield "g2" return [42] self.assertEqual(list(g1()), ["g2"]) def test_generator_return_value(self): """ Test generator return value """ trace = [] def g1(): trace.append("Starting g1") yield "g1 ham" ret = yield from g2() trace.append("g2 returned %r" % (ret,)) for v in 1, (2,), StopIteration(3): ret = yield from g2(v) trace.append("g2 returned %r" % (ret,)) yield "g1 eggs" trace.append("Finishing g1") def g2(v = None): trace.append("Starting g2") yield "g2 spam" yield "g2 more spam" trace.append("Finishing g2") if v: return v for x in g1(): trace.append("Yielded %s" % (x,)) self.assertEqual(trace,[ "Starting g1", "Yielded g1 ham", "Starting g2", "Yielded g2 spam", "Yielded g2 more spam", "Finishing g2", "g2 returned None", "Starting g2", "Yielded g2 spam", "Yielded g2 more spam", "Finishing g2", "g2 returned 1", "Starting g2", "Yielded g2 spam", "Yielded g2 more spam", "Finishing g2", "g2 returned (2,)", "Starting g2", "Yielded g2 spam", "Yielded g2 more spam", "Finishing g2", "g2 returned StopIteration(3,)", "Yielded g1 eggs", "Finishing g1", ]) def test_delegation_of_next_to_non_generator(self): """ Test delegation of next() to non-generator """ trace = [] def g(): yield from range(3) for x in g(): trace.append("Yielded %s" % (x,)) self.assertEqual(trace,[ "Yielded 0", "Yielded 1", "Yielded 2", ]) def test_conversion_of_sendNone_to_next(self): """ Test conversion of send(None) to next() """ trace = [] def g(): yield from range(3) gi = g() for x in range(3): y = gi.send(None) trace.append("Yielded: %s" % (y,)) self.assertEqual(trace,[ "Yielded: 0", "Yielded: 1", "Yielded: 2", ]) def test_delegation_of_close_to_non_generator(self): """ Test delegation of close() to non-generator """ trace = [] def g(): try: trace.append("starting g") yield from range(3) trace.append("g should not be here") finally: trace.append("finishing g") gi = g() next(gi) with captured_stderr() as output: gi.close() self.assertEqual(output.getvalue(), '') self.assertEqual(trace,[ "starting g", "finishing g", ]) def test_delegating_throw_to_non_generator(self): """ Test delegating 'throw' to non-generator """ trace = [] def g(): try: trace.append("Starting g") yield from range(10) finally: trace.append("Finishing g") try: gi = g() for i in range(5): x = next(gi) trace.append("Yielded %s" % (x,)) e = ValueError("tomato ejected") gi.throw(e) except ValueError as e: self.assertEqual(e.args[0],"tomato ejected") else: self.fail("subgenerator failed to raise ValueError") self.assertEqual(trace,[ "Starting g", "Yielded 0", "Yielded 1", "Yielded 2", "Yielded 3", "Yielded 4", "Finishing g", ]) def test_attempting_to_send_to_non_generator(self): """ Test attempting to send to non-generator """ trace = [] def g(): try: trace.append("starting g") yield from range(3) trace.append("g should not be here") finally: trace.append("finishing g") try: gi = g() next(gi) for x in range(3): y = gi.send(42) trace.append("Should not have yielded: %s" % (y,)) except AttributeError as e: self.assertIn("send", e.args[0]) else: self.fail("was able to send into non-generator") self.assertEqual(trace,[ "starting g", "finishing g", ]) def test_broken_getattr_handling(self): """ Test subiterator with a broken getattr implementation """ class Broken: def __iter__(self): return self def __next__(self): return 1 def __getattr__(self, attr): 1/0 def g(): yield from Broken() with self.assertRaises(ZeroDivisionError): gi = g() self.assertEqual(next(gi), 1) gi.send(1) with self.assertRaises(ZeroDivisionError): gi = g() self.assertEqual(next(gi), 1) gi.throw(AttributeError) with captured_stderr() as output: gi = g() self.assertEqual(next(gi), 1) gi.close() self.assertIn('ZeroDivisionError', output.getvalue()) def test_exception_in_initial_next_call(self): """ Test exception in initial next() call """ trace = [] def g1(): trace.append("g1 about to yield from g2") yield from g2() trace.append("g1 should not be here") def g2(): yield 1/0 def run(): gi = g1() next(gi) self.assertRaises(ZeroDivisionError,run) self.assertEqual(trace,[ "g1 about to yield from g2" ]) def test_attempted_yield_from_loop(self): """ Test attempted yield-from loop """ trace = [] def g1(): trace.append("g1: starting") yield "y1" trace.append("g1: about to yield from g2") yield from g2() trace.append("g1 should not be here") def g2(): trace.append("g2: starting") yield "y2" trace.append("g2: about to yield from g1") yield from gi trace.append("g2 should not be here") try: gi = g1() for y in gi: trace.append("Yielded: %s" % (y,)) except ValueError as e: self.assertEqual(e.args[0],"generator already executing") else: self.fail("subgenerator didn't raise ValueError") self.assertEqual(trace,[ "g1: starting", "Yielded: y1", "g1: about to yield from g2", "g2: starting", "Yielded: y2", "g2: about to yield from g1", ]) def test_returning_value_from_delegated_throw(self): """ Test returning value from delegated 'throw' """ trace = [] def g1(): try: trace.append("Starting g1") yield "g1 ham" yield from g2() yield "g1 eggs" finally: trace.append("Finishing g1") def g2(): try: trace.append("Starting g2") yield "g2 spam" yield "g2 more spam" except LunchError: trace.append("Caught LunchError in g2") yield "g2 lunch saved" yield "g2 yet more spam" class LunchError(Exception): pass g = g1() for i in range(2): x = next(g) trace.append("Yielded %s" % (x,)) e = LunchError("tomato ejected") g.throw(e) for x in g: trace.append("Yielded %s" % (x,)) self.assertEqual(trace,[ "Starting g1", "Yielded g1 ham", "Starting g2", "Yielded g2 spam", "Caught LunchError in g2", "Yielded g2 yet more spam", "Yielded g1 eggs", "Finishing g1", ]) def test_next_and_return_with_value(self): """ Test next and return with value """ trace = [] def f(r): gi = g(r) next(gi) try: trace.append("f resuming g") next(gi) trace.append("f SHOULD NOT BE HERE") except StopIteration as e: trace.append("f caught %r" % (e,)) def g(r): trace.append("g starting") yield trace.append("g returning %r" % (r,)) return r f(None) f(1) f((2,)) f(StopIteration(3)) self.assertEqual(trace,[ "g starting", "f resuming g", "g returning None", "f caught StopIteration()", "g starting", "f resuming g", "g returning 1", "f caught StopIteration(1,)", "g starting", "f resuming g", "g returning (2,)", "f caught StopIteration((2,),)", "g starting", "f resuming g", "g returning StopIteration(3,)", "f caught StopIteration(StopIteration(3,),)", ]) def test_send_and_return_with_value(self): """ Test send and return with value """ trace = [] def f(r): gi = g(r) next(gi) try: trace.append("f sending spam to g") gi.send("spam") trace.append("f SHOULD NOT BE HERE") except StopIteration as e: trace.append("f caught %r" % (e,)) def g(r): trace.append("g starting") x = yield trace.append("g received %r" % (x,)) trace.append("g returning %r" % (r,)) return r f(None) f(1) f((2,)) f(StopIteration(3)) self.assertEqual(trace, [ "g starting", "f sending spam to g", "g received 'spam'", "g returning None", "f caught StopIteration()", "g starting", "f sending spam to g", "g received 'spam'", "g returning 1", 'f caught StopIteration(1,)', 'g starting', 'f sending spam to g', "g received 'spam'", 'g returning (2,)', 'f caught StopIteration((2,),)', 'g starting', 'f sending spam to g', "g received 'spam'", 'g returning StopIteration(3,)', 'f caught StopIteration(StopIteration(3,),)' ]) def test_catching_exception_from_subgen_and_returning(self): """ Test catching an exception thrown into a subgenerator and returning a value """ def inner(): try: yield 1 except ValueError: trace.append("inner caught ValueError") return value def outer(): v = yield from inner() trace.append("inner returned %r to outer" % (v,)) yield v for value in 2, (2,), StopIteration(2): trace = [] g = outer() trace.append(next(g)) trace.append(repr(g.throw(ValueError))) self.assertEqual(trace, [ 1, "inner caught ValueError", "inner returned %r to outer" % (value,), repr(value), ]) def test_throwing_GeneratorExit_into_subgen_that_returns(self): """ Test throwing GeneratorExit into a subgenerator that catches it and returns normally. """ trace = [] def f(): try: trace.append("Enter f") yield trace.append("Exit f") except GeneratorExit: return def g(): trace.append("Enter g") yield from f() trace.append("Exit g") try: gi = g() next(gi) gi.throw(GeneratorExit) except GeneratorExit: pass else: self.fail("subgenerator failed to raise GeneratorExit") self.assertEqual(trace,[ "Enter g", "Enter f", ]) def test_throwing_GeneratorExit_into_subgenerator_that_yields(self): """ Test throwing GeneratorExit into a subgenerator that catches it and yields. """ trace = [] def f(): try: trace.append("Enter f") yield trace.append("Exit f") except GeneratorExit: yield def g(): trace.append("Enter g") yield from f() trace.append("Exit g") try: gi = g() next(gi) gi.throw(GeneratorExit) except RuntimeError as e: self.assertEqual(e.args[0], "generator ignored GeneratorExit") else: self.fail("subgenerator failed to raise GeneratorExit") self.assertEqual(trace,[ "Enter g", "Enter f", ]) def test_throwing_GeneratorExit_into_subgen_that_raises(self): """ Test throwing GeneratorExit into a subgenerator that catches it and raises a different exception. """ trace = [] def f(): try: trace.append("Enter f") yield trace.append("Exit f") except GeneratorExit: raise ValueError("Vorpal bunny encountered") def g(): trace.append("Enter g") yield from f() trace.append("Exit g") try: gi = g() next(gi) gi.throw(GeneratorExit) except ValueError as e: self.assertEqual(e.args[0], "Vorpal bunny encountered") self.assertIsInstance(e.__context__, GeneratorExit) else: self.fail("subgenerator failed to raise ValueError") self.assertEqual(trace,[ "Enter g", "Enter f", ]) def test_yield_from_empty(self): def g(): yield from () self.assertRaises(StopIteration, next, g()) def test_delegating_generators_claim_to_be_running(self): # Check with basic iteration def one(): yield 0 yield from two() yield 3 def two(): yield 1 try: yield from g1 except ValueError: pass yield 2 g1 = one() self.assertEqual(list(g1), [0, 1, 2, 3]) # Check with send g1 = one() res = [next(g1)] try: while True: res.append(g1.send(42)) except StopIteration: pass self.assertEqual(res, [0, 1, 2, 3]) # Check with throw class MyErr(Exception): pass def one(): try: yield 0 except MyErr: pass yield from two() try: yield 3 except MyErr: pass def two(): try: yield 1 except MyErr: pass try: yield from g1 except ValueError: pass try: yield 2 except MyErr: pass g1 = one() res = [next(g1)] try: while True: res.append(g1.throw(MyErr)) except StopIteration: pass # Check with close class MyIt(object): def __iter__(self): return self def __next__(self): return 42 def close(self_): self.assertTrue(g1.gi_running) self.assertRaises(ValueError, next, g1) def one(): yield from MyIt() g1 = one() next(g1) g1.close() def test_delegator_is_visible_to_debugger(self): def call_stack(): return [f[3] for f in inspect.stack()] def gen(): yield call_stack() yield call_stack() yield call_stack() def spam(g): yield from g def eggs(g): yield from g for stack in spam(gen()): self.assertTrue('spam' in stack) for stack in spam(eggs(gen())): self.assertTrue('spam' in stack and 'eggs' in stack) def test_custom_iterator_return(self): # See issue #15568 class MyIter: def __iter__(self): return self def __next__(self): raise StopIteration(42) def gen(): nonlocal ret ret = yield from MyIter() ret = None list(gen()) self.assertEqual(ret, 42) def test_close_with_cleared_frame(self): # See issue #17669. # # Create a stack of generators: outer() delegating to inner() # delegating to innermost(). The key point is that the instance of # inner is created first: this ensures that its frame appears before # the instance of outer in the GC linked list. # # At the gc.collect call: # - frame_clear is called on the inner_gen frame. # - gen_dealloc is called on the outer_gen generator (the only # reference is in the frame's locals). # - gen_close is called on the outer_gen generator. # - gen_close_iter is called to close the inner_gen generator, which # in turn calls gen_close, and gen_yf. # # Previously, gen_yf would crash since inner_gen's frame had been # cleared (and in particular f_stacktop was NULL). def innermost(): yield def inner(): outer_gen = yield yield from innermost() def outer(): inner_gen = yield yield from inner_gen with disable_gc(): inner_gen = inner() outer_gen = outer() outer_gen.send(None) outer_gen.send(inner_gen) outer_gen.send(outer_gen) del outer_gen del inner_gen gc_collect() def test_send_tuple_with_custom_generator(self): # See issue #21209. class MyGen: def __iter__(self): return self def __next__(self): return 42 def send(self, what): nonlocal v v = what return None def outer(): v = yield from MyGen() g = outer() next(g) v = None g.send((1, 2, 3, 4)) self.assertEqual(v, (1, 2, 3, 4)) if __name__ == '__main__': unittest.main()