summaryrefslogtreecommitdiff
path: root/Lib/test/test_asyncio/test_locks.py
diff options
context:
space:
mode:
authorGuido van Rossum <guido@dropbox.com>2013-11-04 13:18:19 -0800
committerGuido van Rossum <guido@dropbox.com>2013-11-04 13:18:19 -0800
commitfc27901cc532343336daf39e8c64fef065c696f0 (patch)
tree40883d24a01cf01f044422b945e78116be42e660 /Lib/test/test_asyncio/test_locks.py
parent54bd2bb2eb9bc520bd5994a7f327cddc7d5793d4 (diff)
downloadcpython-fc27901cc532343336daf39e8c64fef065c696f0.tar.gz
asyncio: Locks improvements by Arnaud Faure: better repr(), change Conditio\
n structure.
Diffstat (limited to 'Lib/test/test_asyncio/test_locks.py')
-rw-r--r--Lib/test/test_asyncio/test_locks.py71
1 files changed, 70 insertions, 1 deletions
diff --git a/Lib/test/test_asyncio/test_locks.py b/Lib/test/test_asyncio/test_locks.py
index 31b4d64b7a..19ef877af0 100644
--- a/Lib/test/test_asyncio/test_locks.py
+++ b/Lib/test/test_asyncio/test_locks.py
@@ -2,6 +2,7 @@
import unittest
import unittest.mock
+import re
from asyncio import events
from asyncio import futures
@@ -10,6 +11,15 @@ from asyncio import tasks
from asyncio import test_utils
+STR_RGX_REPR = (
+ r'^<(?P<class>.*?) object at (?P<address>.*?)'
+ r'\[(?P<extras>'
+ r'(set|unset|locked|unlocked)(,value:\d)?(,waiters:\d+)?'
+ r')\]>\Z'
+)
+RGX_REPR = re.compile(STR_RGX_REPR)
+
+
class LockTests(unittest.TestCase):
def setUp(self):
@@ -38,6 +48,7 @@ class LockTests(unittest.TestCase):
def test_repr(self):
lock = locks.Lock(loop=self.loop)
self.assertTrue(repr(lock).endswith('[unlocked]>'))
+ self.assertTrue(RGX_REPR.match(repr(lock)))
@tasks.coroutine
def acquire_lock():
@@ -45,6 +56,7 @@ class LockTests(unittest.TestCase):
self.loop.run_until_complete(acquire_lock())
self.assertTrue(repr(lock).endswith('[locked]>'))
+ self.assertTrue(RGX_REPR.match(repr(lock)))
def test_lock(self):
lock = locks.Lock(loop=self.loop)
@@ -239,9 +251,16 @@ class EventTests(unittest.TestCase):
def test_repr(self):
ev = locks.Event(loop=self.loop)
self.assertTrue(repr(ev).endswith('[unset]>'))
+ match = RGX_REPR.match(repr(ev))
+ self.assertEqual(match.group('extras'), 'unset')
ev.set()
self.assertTrue(repr(ev).endswith('[set]>'))
+ self.assertTrue(RGX_REPR.match(repr(ev)))
+
+ ev._waiters.append(unittest.mock.Mock())
+ self.assertTrue('waiters:1' in repr(ev))
+ self.assertTrue(RGX_REPR.match(repr(ev)))
def test_wait(self):
ev = locks.Event(loop=self.loop)
@@ -440,7 +459,7 @@ class ConditionTests(unittest.TestCase):
self.assertRaises(
futures.CancelledError,
self.loop.run_until_complete, wait)
- self.assertFalse(cond._condition_waiters)
+ self.assertFalse(cond._waiters)
self.assertTrue(cond.locked())
def test_wait_unacquired(self):
@@ -600,6 +619,45 @@ class ConditionTests(unittest.TestCase):
cond = locks.Condition(loop=self.loop)
self.assertRaises(RuntimeError, cond.notify_all)
+ def test_repr(self):
+ cond = locks.Condition(loop=self.loop)
+ self.assertTrue('unlocked' in repr(cond))
+ self.assertTrue(RGX_REPR.match(repr(cond)))
+
+ self.loop.run_until_complete(cond.acquire())
+ self.assertTrue('locked' in repr(cond))
+
+ cond._waiters.append(unittest.mock.Mock())
+ self.assertTrue('waiters:1' in repr(cond))
+ self.assertTrue(RGX_REPR.match(repr(cond)))
+
+ cond._waiters.append(unittest.mock.Mock())
+ self.assertTrue('waiters:2' in repr(cond))
+ self.assertTrue(RGX_REPR.match(repr(cond)))
+
+ def test_context_manager(self):
+ cond = locks.Condition(loop=self.loop)
+
+ @tasks.coroutine
+ def acquire_cond():
+ return (yield from cond)
+
+ with self.loop.run_until_complete(acquire_cond()):
+ self.assertTrue(cond.locked())
+
+ self.assertFalse(cond.locked())
+
+ def test_context_manager_no_yield(self):
+ cond = locks.Condition(loop=self.loop)
+
+ try:
+ with cond:
+ self.fail('RuntimeError is not raised in with expression')
+ except RuntimeError as err:
+ self.assertEqual(
+ str(err),
+ '"yield from" should be used as context manager expression')
+
class SemaphoreTests(unittest.TestCase):
@@ -629,9 +687,20 @@ class SemaphoreTests(unittest.TestCase):
def test_repr(self):
sem = locks.Semaphore(loop=self.loop)
self.assertTrue(repr(sem).endswith('[unlocked,value:1]>'))
+ self.assertTrue(RGX_REPR.match(repr(sem)))
self.loop.run_until_complete(sem.acquire())
self.assertTrue(repr(sem).endswith('[locked]>'))
+ self.assertTrue('waiters' not in repr(sem))
+ self.assertTrue(RGX_REPR.match(repr(sem)))
+
+ sem._waiters.append(unittest.mock.Mock())
+ self.assertTrue('waiters:1' in repr(sem))
+ self.assertTrue(RGX_REPR.match(repr(sem)))
+
+ sem._waiters.append(unittest.mock.Mock())
+ self.assertTrue('waiters:2' in repr(sem))
+ self.assertTrue(RGX_REPR.match(repr(sem)))
def test_semaphore(self):
sem = locks.Semaphore(loop=self.loop)