# -*- coding: utf-8 -*- # Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import string from taskflow import test from taskflow.utils import iter_utils def forever_it(): i = 0 while True: yield i i += 1 class IterUtilsTest(test.TestCase): def test_fill_empty(self): self.assertEqual([], list(iter_utils.fill([1, 2, 3], 0))) def test_bad_unique_seen(self): iters = [ ['a', 'b'], 2, None, object(), ] self.assertRaises(ValueError, iter_utils.unique_seen, iters) def test_generate_delays(self): it = iter_utils.generate_delays(1, 60) self.assertEqual(1, next(it)) self.assertEqual(2, next(it)) self.assertEqual(4, next(it)) self.assertEqual(8, next(it)) self.assertEqual(16, next(it)) self.assertEqual(32, next(it)) self.assertEqual(60, next(it)) self.assertEqual(60, next(it)) def test_generate_delays_custom_multiplier(self): it = iter_utils.generate_delays(1, 60, multiplier=4) self.assertEqual(1, next(it)) self.assertEqual(4, next(it)) self.assertEqual(16, next(it)) self.assertEqual(60, next(it)) self.assertEqual(60, next(it)) def test_generate_delays_bad(self): self.assertRaises(ValueError, iter_utils.generate_delays, -1, -1) self.assertRaises(ValueError, iter_utils.generate_delays, -1, 2) self.assertRaises(ValueError, iter_utils.generate_delays, 2, -1) self.assertRaises(ValueError, iter_utils.generate_delays, 1, 1, multiplier=0.5) def test_unique_seen(self): iters = [ ['a', 'b'], ['a', 'c', 'd'], ['a', 'e', 'f'], ['f', 'm', 'n'], ] self.assertEqual(['a', 'b', 'c', 'd', 'e', 'f', 'm', 'n'], list(iter_utils.unique_seen(iters))) def test_unique_seen_empty(self): iters = [] self.assertEqual([], list(iter_utils.unique_seen(iters))) def test_unique_seen_selector(self): iters = [ [(1, 'a'), (1, 'a')], [(2, 'b')], [(3, 'c')], [(1, 'a'), (3, 'c')], ] it = iter_utils.unique_seen(iters, seen_selector=lambda value: value[0]) self.assertEqual([(1, 'a'), (2, 'b'), (3, 'c')], list(it)) def test_bad_fill(self): self.assertRaises(ValueError, iter_utils.fill, 2, 2) def test_fill_many_empty(self): result = list(iter_utils.fill(range(0, 50), 500)) self.assertEqual(450, sum(1 for x in result if x is None)) self.assertEqual(50, sum(1 for x in result if x is not None)) def test_fill_custom_filler(self): self.assertEqual("abcd", "".join(iter_utils.fill("abc", 4, filler='d'))) def test_fill_less_needed(self): self.assertEqual("ab", "".join(iter_utils.fill("abc", 2))) def test_fill(self): self.assertEqual([None, None], list(iter_utils.fill([], 2))) self.assertEqual((None, None), tuple(iter_utils.fill([], 2))) def test_bad_find_first_match(self): self.assertRaises(ValueError, iter_utils.find_first_match, 2, lambda v: False) def test_find_first_match(self): it = forever_it() self.assertEqual(100, iter_utils.find_first_match(it, lambda v: v == 100)) def test_find_first_match_not_found(self): it = iter(string.ascii_lowercase) self.assertIsNone(iter_utils.find_first_match(it, lambda v: v == '')) def test_bad_count(self): self.assertRaises(ValueError, iter_utils.count, 2) def test_count(self): self.assertEqual(0, iter_utils.count([])) self.assertEqual(1, iter_utils.count(['a'])) self.assertEqual(10, iter_utils.count(range(0, 10))) self.assertEqual(1000, iter_utils.count(range(0, 1000))) self.assertEqual(0, iter_utils.count(range(0))) self.assertEqual(0, iter_utils.count(range(-1))) def test_bad_while_is_not(self): self.assertRaises(ValueError, iter_utils.while_is_not, 2, 'a') def test_while_is_not(self): it = iter(string.ascii_lowercase) self.assertEqual(['a'], list(iter_utils.while_is_not(it, 'a'))) it = iter(string.ascii_lowercase) self.assertEqual(['a', 'b'], list(iter_utils.while_is_not(it, 'b'))) self.assertEqual(list(string.ascii_lowercase[2:]), list(iter_utils.while_is_not(it, 'zzz'))) it = iter(string.ascii_lowercase) self.assertEqual(list(string.ascii_lowercase), list(iter_utils.while_is_not(it, '')))