diff options
Diffstat (limited to 'Lib/test/test_yield_from.py')
-rw-r--r-- | Lib/test/test_yield_from.py | 1052 |
1 files changed, 1052 insertions, 0 deletions
diff --git a/Lib/test/test_yield_from.py b/Lib/test/test_yield_from.py new file mode 100644 index 0000000000..7e9711eaf5 --- /dev/null +++ b/Lib/test/test_yield_from.py @@ -0,0 +1,1052 @@ +# -*- coding: utf-8 -*- + +""" +Test suite for PEP 380 implementation + +adapted from original tests written by Greg Ewing +see <http://www.cosc.canterbury.ac.nz/greg.ewing/python/yield-from/YieldFrom-Python3.1.2-rev5.zip> +""" + +import unittest +import io +import sys +import inspect +import parser + +from test.support import captured_stderr, disable_gc, gc_collect + +class TestPEP380Operation(unittest.TestCase): + """ + Test semantics. + """ + + def test_delegation_of_initial_next_to_subgenerator(self): + """ + Test delegation of initial next() call to subgenerator + """ + trace = [] + def g1(): + trace.append("Starting g1") + yield from g2() + trace.append("Finishing g1") + def g2(): + trace.append("Starting g2") + yield 42 + trace.append("Finishing g2") + for x in g1(): + trace.append("Yielded %s" % (x,)) + self.assertEqual(trace,[ + "Starting g1", + "Starting g2", + "Yielded 42", + "Finishing g2", + "Finishing g1", + ]) + + def test_raising_exception_in_initial_next_call(self): + """ + Test raising exception in initial next() call + """ + trace = [] + def g1(): + try: + trace.append("Starting g1") + yield from g2() + finally: + trace.append("Finishing g1") + def g2(): + try: + trace.append("Starting g2") + raise ValueError("spanish inquisition occurred") + finally: + trace.append("Finishing g2") + try: + for x in g1(): + trace.append("Yielded %s" % (x,)) + except ValueError as e: + self.assertEqual(e.args[0], "spanish inquisition occurred") + else: + self.fail("subgenerator failed to raise ValueError") + self.assertEqual(trace,[ + "Starting g1", + "Starting g2", + "Finishing g2", + "Finishing g1", + ]) + + def test_delegation_of_next_call_to_subgenerator(self): + """ + Test delegation of next() call to subgenerator + """ + trace = [] + def g1(): + trace.append("Starting g1") + yield "g1 ham" + yield from g2() + yield "g1 eggs" + trace.append("Finishing g1") + def g2(): + trace.append("Starting g2") + yield "g2 spam" + yield "g2 more spam" + trace.append("Finishing g2") + for x in g1(): + trace.append("Yielded %s" % (x,)) + self.assertEqual(trace,[ + "Starting g1", + "Yielded g1 ham", + "Starting g2", + "Yielded g2 spam", + "Yielded g2 more spam", + "Finishing g2", + "Yielded g1 eggs", + "Finishing g1", + ]) + + def test_raising_exception_in_delegated_next_call(self): + """ + Test raising exception in delegated next() call + """ + trace = [] + def g1(): + try: + trace.append("Starting g1") + yield "g1 ham" + yield from g2() + yield "g1 eggs" + finally: + trace.append("Finishing g1") + def g2(): + try: + trace.append("Starting g2") + yield "g2 spam" + raise ValueError("hovercraft is full of eels") + yield "g2 more spam" + finally: + trace.append("Finishing g2") + try: + for x in g1(): + trace.append("Yielded %s" % (x,)) + except ValueError as e: + self.assertEqual(e.args[0], "hovercraft is full of eels") + else: + self.fail("subgenerator failed to raise ValueError") + self.assertEqual(trace,[ + "Starting g1", + "Yielded g1 ham", + "Starting g2", + "Yielded g2 spam", + "Finishing g2", + "Finishing g1", + ]) + + def test_delegation_of_send(self): + """ + Test delegation of send() + """ + trace = [] + def g1(): + trace.append("Starting g1") + x = yield "g1 ham" + trace.append("g1 received %s" % (x,)) + yield from g2() + x = yield "g1 eggs" + trace.append("g1 received %s" % (x,)) + trace.append("Finishing g1") + def g2(): + trace.append("Starting g2") + x = yield "g2 spam" + trace.append("g2 received %s" % (x,)) + x = yield "g2 more spam" + trace.append("g2 received %s" % (x,)) + trace.append("Finishing g2") + g = g1() + y = next(g) + x = 1 + try: + while 1: + y = g.send(x) + trace.append("Yielded %s" % (y,)) + x += 1 + except StopIteration: + pass + self.assertEqual(trace,[ + "Starting g1", + "g1 received 1", + "Starting g2", + "Yielded g2 spam", + "g2 received 2", + "Yielded g2 more spam", + "g2 received 3", + "Finishing g2", + "Yielded g1 eggs", + "g1 received 4", + "Finishing g1", + ]) + + def test_handling_exception_while_delegating_send(self): + """ + Test handling exception while delegating 'send' + """ + trace = [] + def g1(): + trace.append("Starting g1") + x = yield "g1 ham" + trace.append("g1 received %s" % (x,)) + yield from g2() + x = yield "g1 eggs" + trace.append("g1 received %s" % (x,)) + trace.append("Finishing g1") + def g2(): + trace.append("Starting g2") + x = yield "g2 spam" + trace.append("g2 received %s" % (x,)) + raise ValueError("hovercraft is full of eels") + x = yield "g2 more spam" + trace.append("g2 received %s" % (x,)) + trace.append("Finishing g2") + def run(): + g = g1() + y = next(g) + x = 1 + try: + while 1: + y = g.send(x) + trace.append("Yielded %s" % (y,)) + x += 1 + except StopIteration: + trace.append("StopIteration") + self.assertRaises(ValueError,run) + self.assertEqual(trace,[ + "Starting g1", + "g1 received 1", + "Starting g2", + "Yielded g2 spam", + "g2 received 2", + ]) + + def test_delegating_close(self): + """ + Test delegating 'close' + """ + trace = [] + def g1(): + try: + trace.append("Starting g1") + yield "g1 ham" + yield from g2() + yield "g1 eggs" + finally: + trace.append("Finishing g1") + def g2(): + try: + trace.append("Starting g2") + yield "g2 spam" + yield "g2 more spam" + finally: + trace.append("Finishing g2") + g = g1() + for i in range(2): + x = next(g) + trace.append("Yielded %s" % (x,)) + g.close() + self.assertEqual(trace,[ + "Starting g1", + "Yielded g1 ham", + "Starting g2", + "Yielded g2 spam", + "Finishing g2", + "Finishing g1" + ]) + + def test_handing_exception_while_delegating_close(self): + """ + Test handling exception while delegating 'close' + """ + trace = [] + def g1(): + try: + trace.append("Starting g1") + yield "g1 ham" + yield from g2() + yield "g1 eggs" + finally: + trace.append("Finishing g1") + def g2(): + try: + trace.append("Starting g2") + yield "g2 spam" + yield "g2 more spam" + finally: + trace.append("Finishing g2") + raise ValueError("nybbles have exploded with delight") + try: + g = g1() + for i in range(2): + x = next(g) + trace.append("Yielded %s" % (x,)) + g.close() + except ValueError as e: + self.assertEqual(e.args[0], "nybbles have exploded with delight") + self.assertIsInstance(e.__context__, GeneratorExit) + else: + self.fail("subgenerator failed to raise ValueError") + self.assertEqual(trace,[ + "Starting g1", + "Yielded g1 ham", + "Starting g2", + "Yielded g2 spam", + "Finishing g2", + "Finishing g1", + ]) + + def test_delegating_throw(self): + """ + Test delegating 'throw' + """ + trace = [] + def g1(): + try: + trace.append("Starting g1") + yield "g1 ham" + yield from g2() + yield "g1 eggs" + finally: + trace.append("Finishing g1") + def g2(): + try: + trace.append("Starting g2") + yield "g2 spam" + yield "g2 more spam" + finally: + trace.append("Finishing g2") + try: + g = g1() + for i in range(2): + x = next(g) + trace.append("Yielded %s" % (x,)) + e = ValueError("tomato ejected") + g.throw(e) + except ValueError as e: + self.assertEqual(e.args[0], "tomato ejected") + else: + self.fail("subgenerator failed to raise ValueError") + self.assertEqual(trace,[ + "Starting g1", + "Yielded g1 ham", + "Starting g2", + "Yielded g2 spam", + "Finishing g2", + "Finishing g1", + ]) + + def test_value_attribute_of_StopIteration_exception(self): + """ + Test 'value' attribute of StopIteration exception + """ + trace = [] + def pex(e): + trace.append("%s: %s" % (e.__class__.__name__, e)) + trace.append("value = %s" % (e.value,)) + e = StopIteration() + pex(e) + e = StopIteration("spam") + pex(e) + e.value = "eggs" + pex(e) + self.assertEqual(trace,[ + "StopIteration: ", + "value = None", + "StopIteration: spam", + "value = spam", + "StopIteration: spam", + "value = eggs", + ]) + + + def test_exception_value_crash(self): + # There used to be a refcount error when the return value + # stored in the StopIteration has a refcount of 1. + def g1(): + yield from g2() + def g2(): + yield "g2" + return [42] + self.assertEqual(list(g1()), ["g2"]) + + + def test_generator_return_value(self): + """ + Test generator return value + """ + trace = [] + def g1(): + trace.append("Starting g1") + yield "g1 ham" + ret = yield from g2() + trace.append("g2 returned %r" % (ret,)) + for v in 1, (2,), StopIteration(3): + ret = yield from g2(v) + trace.append("g2 returned %r" % (ret,)) + yield "g1 eggs" + trace.append("Finishing g1") + def g2(v = None): + trace.append("Starting g2") + yield "g2 spam" + yield "g2 more spam" + trace.append("Finishing g2") + if v: + return v + for x in g1(): + trace.append("Yielded %s" % (x,)) + self.assertEqual(trace,[ + "Starting g1", + "Yielded g1 ham", + "Starting g2", + "Yielded g2 spam", + "Yielded g2 more spam", + "Finishing g2", + "g2 returned None", + "Starting g2", + "Yielded g2 spam", + "Yielded g2 more spam", + "Finishing g2", + "g2 returned 1", + "Starting g2", + "Yielded g2 spam", + "Yielded g2 more spam", + "Finishing g2", + "g2 returned (2,)", + "Starting g2", + "Yielded g2 spam", + "Yielded g2 more spam", + "Finishing g2", + "g2 returned StopIteration(3,)", + "Yielded g1 eggs", + "Finishing g1", + ]) + + def test_delegation_of_next_to_non_generator(self): + """ + Test delegation of next() to non-generator + """ + trace = [] + def g(): + yield from range(3) + for x in g(): + trace.append("Yielded %s" % (x,)) + self.assertEqual(trace,[ + "Yielded 0", + "Yielded 1", + "Yielded 2", + ]) + + + def test_conversion_of_sendNone_to_next(self): + """ + Test conversion of send(None) to next() + """ + trace = [] + def g(): + yield from range(3) + gi = g() + for x in range(3): + y = gi.send(None) + trace.append("Yielded: %s" % (y,)) + self.assertEqual(trace,[ + "Yielded: 0", + "Yielded: 1", + "Yielded: 2", + ]) + + def test_delegation_of_close_to_non_generator(self): + """ + Test delegation of close() to non-generator + """ + trace = [] + def g(): + try: + trace.append("starting g") + yield from range(3) + trace.append("g should not be here") + finally: + trace.append("finishing g") + gi = g() + next(gi) + with captured_stderr() as output: + gi.close() + self.assertEqual(output.getvalue(), '') + self.assertEqual(trace,[ + "starting g", + "finishing g", + ]) + + def test_delegating_throw_to_non_generator(self): + """ + Test delegating 'throw' to non-generator + """ + trace = [] + def g(): + try: + trace.append("Starting g") + yield from range(10) + finally: + trace.append("Finishing g") + try: + gi = g() + for i in range(5): + x = next(gi) + trace.append("Yielded %s" % (x,)) + e = ValueError("tomato ejected") + gi.throw(e) + except ValueError as e: + self.assertEqual(e.args[0],"tomato ejected") + else: + self.fail("subgenerator failed to raise ValueError") + self.assertEqual(trace,[ + "Starting g", + "Yielded 0", + "Yielded 1", + "Yielded 2", + "Yielded 3", + "Yielded 4", + "Finishing g", + ]) + + def test_attempting_to_send_to_non_generator(self): + """ + Test attempting to send to non-generator + """ + trace = [] + def g(): + try: + trace.append("starting g") + yield from range(3) + trace.append("g should not be here") + finally: + trace.append("finishing g") + try: + gi = g() + next(gi) + for x in range(3): + y = gi.send(42) + trace.append("Should not have yielded: %s" % (y,)) + except AttributeError as e: + self.assertIn("send", e.args[0]) + else: + self.fail("was able to send into non-generator") + self.assertEqual(trace,[ + "starting g", + "finishing g", + ]) + + def test_broken_getattr_handling(self): + """ + Test subiterator with a broken getattr implementation + """ + class Broken: + def __iter__(self): + return self + def __next__(self): + return 1 + def __getattr__(self, attr): + 1/0 + + def g(): + yield from Broken() + + with self.assertRaises(ZeroDivisionError): + gi = g() + self.assertEqual(next(gi), 1) + gi.send(1) + + with self.assertRaises(ZeroDivisionError): + gi = g() + self.assertEqual(next(gi), 1) + gi.throw(AttributeError) + + with captured_stderr() as output: + gi = g() + self.assertEqual(next(gi), 1) + gi.close() + self.assertIn('ZeroDivisionError', output.getvalue()) + + def test_exception_in_initial_next_call(self): + """ + Test exception in initial next() call + """ + trace = [] + def g1(): + trace.append("g1 about to yield from g2") + yield from g2() + trace.append("g1 should not be here") + def g2(): + yield 1/0 + def run(): + gi = g1() + next(gi) + self.assertRaises(ZeroDivisionError,run) + self.assertEqual(trace,[ + "g1 about to yield from g2" + ]) + + def test_attempted_yield_from_loop(self): + """ + Test attempted yield-from loop + """ + trace = [] + def g1(): + trace.append("g1: starting") + yield "y1" + trace.append("g1: about to yield from g2") + yield from g2() + trace.append("g1 should not be here") + + def g2(): + trace.append("g2: starting") + yield "y2" + trace.append("g2: about to yield from g1") + yield from gi + trace.append("g2 should not be here") + try: + gi = g1() + for y in gi: + trace.append("Yielded: %s" % (y,)) + except ValueError as e: + self.assertEqual(e.args[0],"generator already executing") + else: + self.fail("subgenerator didn't raise ValueError") + self.assertEqual(trace,[ + "g1: starting", + "Yielded: y1", + "g1: about to yield from g2", + "g2: starting", + "Yielded: y2", + "g2: about to yield from g1", + ]) + + def test_returning_value_from_delegated_throw(self): + """ + Test returning value from delegated 'throw' + """ + trace = [] + def g1(): + try: + trace.append("Starting g1") + yield "g1 ham" + yield from g2() + yield "g1 eggs" + finally: + trace.append("Finishing g1") + def g2(): + try: + trace.append("Starting g2") + yield "g2 spam" + yield "g2 more spam" + except LunchError: + trace.append("Caught LunchError in g2") + yield "g2 lunch saved" + yield "g2 yet more spam" + class LunchError(Exception): + pass + g = g1() + for i in range(2): + x = next(g) + trace.append("Yielded %s" % (x,)) + e = LunchError("tomato ejected") + g.throw(e) + for x in g: + trace.append("Yielded %s" % (x,)) + self.assertEqual(trace,[ + "Starting g1", + "Yielded g1 ham", + "Starting g2", + "Yielded g2 spam", + "Caught LunchError in g2", + "Yielded g2 yet more spam", + "Yielded g1 eggs", + "Finishing g1", + ]) + + def test_next_and_return_with_value(self): + """ + Test next and return with value + """ + trace = [] + def f(r): + gi = g(r) + next(gi) + try: + trace.append("f resuming g") + next(gi) + trace.append("f SHOULD NOT BE HERE") + except StopIteration as e: + trace.append("f caught %r" % (e,)) + def g(r): + trace.append("g starting") + yield + trace.append("g returning %r" % (r,)) + return r + f(None) + f(1) + f((2,)) + f(StopIteration(3)) + self.assertEqual(trace,[ + "g starting", + "f resuming g", + "g returning None", + "f caught StopIteration()", + "g starting", + "f resuming g", + "g returning 1", + "f caught StopIteration(1,)", + "g starting", + "f resuming g", + "g returning (2,)", + "f caught StopIteration((2,),)", + "g starting", + "f resuming g", + "g returning StopIteration(3,)", + "f caught StopIteration(StopIteration(3,),)", + ]) + + def test_send_and_return_with_value(self): + """ + Test send and return with value + """ + trace = [] + def f(r): + gi = g(r) + next(gi) + try: + trace.append("f sending spam to g") + gi.send("spam") + trace.append("f SHOULD NOT BE HERE") + except StopIteration as e: + trace.append("f caught %r" % (e,)) + def g(r): + trace.append("g starting") + x = yield + trace.append("g received %r" % (x,)) + trace.append("g returning %r" % (r,)) + return r + f(None) + f(1) + f((2,)) + f(StopIteration(3)) + self.assertEqual(trace, [ + "g starting", + "f sending spam to g", + "g received 'spam'", + "g returning None", + "f caught StopIteration()", + "g starting", + "f sending spam to g", + "g received 'spam'", + "g returning 1", + 'f caught StopIteration(1,)', + 'g starting', + 'f sending spam to g', + "g received 'spam'", + 'g returning (2,)', + 'f caught StopIteration((2,),)', + 'g starting', + 'f sending spam to g', + "g received 'spam'", + 'g returning StopIteration(3,)', + 'f caught StopIteration(StopIteration(3,),)' + ]) + + def test_catching_exception_from_subgen_and_returning(self): + """ + Test catching an exception thrown into a + subgenerator and returning a value + """ + def inner(): + try: + yield 1 + except ValueError: + trace.append("inner caught ValueError") + return value + + def outer(): + v = yield from inner() + trace.append("inner returned %r to outer" % (v,)) + yield v + + for value in 2, (2,), StopIteration(2): + trace = [] + g = outer() + trace.append(next(g)) + trace.append(repr(g.throw(ValueError))) + self.assertEqual(trace, [ + 1, + "inner caught ValueError", + "inner returned %r to outer" % (value,), + repr(value), + ]) + + def test_throwing_GeneratorExit_into_subgen_that_returns(self): + """ + Test throwing GeneratorExit into a subgenerator that + catches it and returns normally. + """ + trace = [] + def f(): + try: + trace.append("Enter f") + yield + trace.append("Exit f") + except GeneratorExit: + return + def g(): + trace.append("Enter g") + yield from f() + trace.append("Exit g") + try: + gi = g() + next(gi) + gi.throw(GeneratorExit) + except GeneratorExit: + pass + else: + self.fail("subgenerator failed to raise GeneratorExit") + self.assertEqual(trace,[ + "Enter g", + "Enter f", + ]) + + def test_throwing_GeneratorExit_into_subgenerator_that_yields(self): + """ + Test throwing GeneratorExit into a subgenerator that + catches it and yields. + """ + trace = [] + def f(): + try: + trace.append("Enter f") + yield + trace.append("Exit f") + except GeneratorExit: + yield + def g(): + trace.append("Enter g") + yield from f() + trace.append("Exit g") + try: + gi = g() + next(gi) + gi.throw(GeneratorExit) + except RuntimeError as e: + self.assertEqual(e.args[0], "generator ignored GeneratorExit") + else: + self.fail("subgenerator failed to raise GeneratorExit") + self.assertEqual(trace,[ + "Enter g", + "Enter f", + ]) + + def test_throwing_GeneratorExit_into_subgen_that_raises(self): + """ + Test throwing GeneratorExit into a subgenerator that + catches it and raises a different exception. + """ + trace = [] + def f(): + try: + trace.append("Enter f") + yield + trace.append("Exit f") + except GeneratorExit: + raise ValueError("Vorpal bunny encountered") + def g(): + trace.append("Enter g") + yield from f() + trace.append("Exit g") + try: + gi = g() + next(gi) + gi.throw(GeneratorExit) + except ValueError as e: + self.assertEqual(e.args[0], "Vorpal bunny encountered") + self.assertIsInstance(e.__context__, GeneratorExit) + else: + self.fail("subgenerator failed to raise ValueError") + self.assertEqual(trace,[ + "Enter g", + "Enter f", + ]) + + def test_yield_from_empty(self): + def g(): + yield from () + self.assertRaises(StopIteration, next, g()) + + def test_delegating_generators_claim_to_be_running(self): + # Check with basic iteration + def one(): + yield 0 + yield from two() + yield 3 + def two(): + yield 1 + try: + yield from g1 + except ValueError: + pass + yield 2 + g1 = one() + self.assertEqual(list(g1), [0, 1, 2, 3]) + # Check with send + g1 = one() + res = [next(g1)] + try: + while True: + res.append(g1.send(42)) + except StopIteration: + pass + self.assertEqual(res, [0, 1, 2, 3]) + # Check with throw + class MyErr(Exception): + pass + def one(): + try: + yield 0 + except MyErr: + pass + yield from two() + try: + yield 3 + except MyErr: + pass + def two(): + try: + yield 1 + except MyErr: + pass + try: + yield from g1 + except ValueError: + pass + try: + yield 2 + except MyErr: + pass + g1 = one() + res = [next(g1)] + try: + while True: + res.append(g1.throw(MyErr)) + except StopIteration: + pass + # Check with close + class MyIt(object): + def __iter__(self): + return self + def __next__(self): + return 42 + def close(self_): + self.assertTrue(g1.gi_running) + self.assertRaises(ValueError, next, g1) + def one(): + yield from MyIt() + g1 = one() + next(g1) + g1.close() + + def test_delegator_is_visible_to_debugger(self): + def call_stack(): + return [f[3] for f in inspect.stack()] + + def gen(): + yield call_stack() + yield call_stack() + yield call_stack() + + def spam(g): + yield from g + + def eggs(g): + yield from g + + for stack in spam(gen()): + self.assertTrue('spam' in stack) + + for stack in spam(eggs(gen())): + self.assertTrue('spam' in stack and 'eggs' in stack) + + def test_custom_iterator_return(self): + # See issue #15568 + class MyIter: + def __iter__(self): + return self + def __next__(self): + raise StopIteration(42) + def gen(): + nonlocal ret + ret = yield from MyIter() + ret = None + list(gen()) + self.assertEqual(ret, 42) + + def test_close_with_cleared_frame(self): + # See issue #17669. + # + # Create a stack of generators: outer() delegating to inner() + # delegating to innermost(). The key point is that the instance of + # inner is created first: this ensures that its frame appears before + # the instance of outer in the GC linked list. + # + # At the gc.collect call: + # - frame_clear is called on the inner_gen frame. + # - gen_dealloc is called on the outer_gen generator (the only + # reference is in the frame's locals). + # - gen_close is called on the outer_gen generator. + # - gen_close_iter is called to close the inner_gen generator, which + # in turn calls gen_close, and gen_yf. + # + # Previously, gen_yf would crash since inner_gen's frame had been + # cleared (and in particular f_stacktop was NULL). + + def innermost(): + yield + def inner(): + outer_gen = yield + yield from innermost() + def outer(): + inner_gen = yield + yield from inner_gen + + with disable_gc(): + inner_gen = inner() + outer_gen = outer() + outer_gen.send(None) + outer_gen.send(inner_gen) + outer_gen.send(outer_gen) + + del outer_gen + del inner_gen + gc_collect() + + def test_send_tuple_with_custom_generator(self): + # See issue #21209. + class MyGen: + def __iter__(self): + return self + def __next__(self): + return 42 + def send(self, what): + nonlocal v + v = what + return None + def outer(): + v = yield from MyGen() + g = outer() + next(g) + v = None + g.send((1, 2, 3, 4)) + self.assertEqual(v, (1, 2, 3, 4)) + + +if __name__ == '__main__': + unittest.main() |