1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
|
import contextlib
from datetime import timedelta
from functools import partial
from tornado.concurrent import Future
from tornado.ioloop import IOLoop
from tornado.testing import gen_test, gen
import toro
@contextlib.contextmanager
def assert_raises(exc_class):
"""Roughly a backport of Python 2.7's TestCase.assertRaises"""
try:
yield
except exc_class:
pass
else:
assert False, "%s not raised" % exc_class
def make_callback(key, history):
def callback(future):
exc = future.exception()
if exc:
history.append(exc.__class__.__name__)
else:
history.append(key)
return callback
def pause(deadline):
future = Future()
IOLoop.current().add_timeout(deadline, partial(future.set_result, None))
return future
class ContextManagerTestsMixin(object):
"""Test a Toro object's behavior with the "with" statement
Combine this mixin with an AsyncTestCase that has a field 'toro_class'
"""
@gen_test
def test_context_manager(self):
toro_obj = self.toro_class()
with (yield toro_obj.acquire()) as yielded:
self.assertTrue(toro_obj.locked())
self.assertTrue(yielded is None)
self.assertFalse(toro_obj.locked())
@gen_test
def test_context_manager_exception(self):
toro_obj = self.toro_class()
with assert_raises(ZeroDivisionError):
with (yield toro_obj.acquire()):
1 / 0
# Context manager released toro_obj
self.assertFalse(toro_obj.locked())
@gen_test
def test_context_manager_contended(self):
toro_obj = toro.Semaphore()
history = []
n_coroutines = 10
@gen.coroutine
def f(i):
with (yield toro_obj.acquire()):
history.append('acquired %d' % i)
yield pause(timedelta(seconds=0.01))
history.append('releasing %d' % i)
yield [f(i) for i in range(n_coroutines)]
expected_history = []
for i in range(n_coroutines):
expected_history.extend(['acquired %d' % i, 'releasing %d' % i])
self.assertEqual(expected_history, history)
def test_context_manager_misuse(self):
toro_obj = self.toro_class()
# Ensure we catch a "with toro_obj", which should be
# "with (yield toro_obj)"
with assert_raises(RuntimeError):
with toro_obj:
pass
|