|
|
|
""" |
|
Testing scenarios that may have leaked. |
|
""" |
|
from __future__ import print_function, absolute_import, division |
|
|
|
import sys |
|
import gc |
|
|
|
import time |
|
import weakref |
|
import threading |
|
|
|
|
|
import greenlet |
|
from . import TestCase |
|
from .leakcheck import fails_leakcheck |
|
from .leakcheck import ignores_leakcheck |
|
from .leakcheck import RUNNING_ON_MANYLINUX |
|
|
|
|
|
|
|
assert greenlet.GREENLET_USE_GC |
|
|
|
class HasFinalizerTracksInstances(object): |
|
EXTANT_INSTANCES = set() |
|
def __init__(self, msg): |
|
self.msg = sys.intern(msg) |
|
self.EXTANT_INSTANCES.add(id(self)) |
|
def __del__(self): |
|
self.EXTANT_INSTANCES.remove(id(self)) |
|
def __repr__(self): |
|
return "<HasFinalizerTracksInstances at 0x%x %r>" % ( |
|
id(self), self.msg |
|
) |
|
@classmethod |
|
def reset(cls): |
|
cls.EXTANT_INSTANCES.clear() |
|
|
|
|
|
class TestLeaks(TestCase): |
|
|
|
def test_arg_refs(self): |
|
args = ('a', 'b', 'c') |
|
refcount_before = sys.getrefcount(args) |
|
|
|
g = greenlet.greenlet( |
|
lambda *args: greenlet.getcurrent().parent.switch(*args)) |
|
for _ in range(100): |
|
g.switch(*args) |
|
self.assertEqual(sys.getrefcount(args), refcount_before) |
|
|
|
def test_kwarg_refs(self): |
|
kwargs = {} |
|
|
|
g = greenlet.greenlet( |
|
lambda **kwargs: greenlet.getcurrent().parent.switch(**kwargs)) |
|
for _ in range(100): |
|
g.switch(**kwargs) |
|
self.assertEqual(sys.getrefcount(kwargs), 2) |
|
|
|
|
|
@staticmethod |
|
def __recycle_threads(): |
|
|
|
|
|
|
|
|
|
|
|
|
|
def worker(): |
|
time.sleep(0.001) |
|
t = threading.Thread(target=worker) |
|
t.start() |
|
time.sleep(0.001) |
|
t.join(10) |
|
|
|
def test_threaded_leak(self): |
|
gg = [] |
|
def worker(): |
|
|
|
gg.append(weakref.ref(greenlet.getcurrent())) |
|
for _ in range(2): |
|
t = threading.Thread(target=worker) |
|
t.start() |
|
t.join(10) |
|
del t |
|
greenlet.getcurrent() |
|
self.__recycle_threads() |
|
greenlet.getcurrent() |
|
gc.collect() |
|
greenlet.getcurrent() |
|
for g in gg: |
|
self.assertIsNone(g()) |
|
|
|
def test_threaded_adv_leak(self): |
|
gg = [] |
|
def worker(): |
|
|
|
ll = greenlet.getcurrent().ll = [] |
|
def additional(): |
|
ll.append(greenlet.getcurrent()) |
|
for _ in range(2): |
|
greenlet.greenlet(additional).switch() |
|
gg.append(weakref.ref(greenlet.getcurrent())) |
|
for _ in range(2): |
|
t = threading.Thread(target=worker) |
|
t.start() |
|
t.join(10) |
|
del t |
|
greenlet.getcurrent() |
|
self.__recycle_threads() |
|
greenlet.getcurrent() |
|
gc.collect() |
|
greenlet.getcurrent() |
|
for g in gg: |
|
self.assertIsNone(g()) |
|
|
|
def assertClocksUsed(self): |
|
used = greenlet._greenlet.get_clocks_used_doing_optional_cleanup() |
|
self.assertGreaterEqual(used, 0) |
|
|
|
greenlet._greenlet.enable_optional_cleanup(True) |
|
used2 = greenlet._greenlet.get_clocks_used_doing_optional_cleanup() |
|
self.assertEqual(used, used2) |
|
self.assertGreater(greenlet._greenlet.CLOCKS_PER_SEC, 1) |
|
|
|
def _check_issue251(self, |
|
manually_collect_background=True, |
|
explicit_reference_to_switch=False): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
assert gc.is_tracked([]) |
|
HasFinalizerTracksInstances.reset() |
|
greenlet.getcurrent() |
|
greenlets_before = self.count_objects(greenlet.greenlet, exact_kind=False) |
|
|
|
background_glet_running = threading.Event() |
|
background_glet_killed = threading.Event() |
|
background_greenlets = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def background_greenlet(): |
|
|
|
jd = HasFinalizerTracksInstances("DELETING STACK OBJECT") |
|
greenlet._greenlet.set_thread_local( |
|
'test_leaks_key', |
|
HasFinalizerTracksInstances("DELETING THREAD STATE")) |
|
|
|
|
|
if explicit_reference_to_switch: |
|
s = greenlet.getcurrent().parent.switch |
|
s([jd]) |
|
else: |
|
greenlet.getcurrent().parent.switch([jd]) |
|
|
|
bg_main_wrefs = [] |
|
|
|
def background_thread(): |
|
glet = greenlet.greenlet(background_greenlet) |
|
bg_main_wrefs.append(weakref.ref(glet.parent)) |
|
|
|
background_greenlets.append(glet) |
|
glet.switch() |
|
|
|
del glet |
|
background_glet_running.set() |
|
background_glet_killed.wait(10) |
|
|
|
|
|
|
|
|
|
if manually_collect_background: |
|
greenlet.getcurrent() |
|
|
|
|
|
t = threading.Thread(target=background_thread) |
|
t.start() |
|
background_glet_running.wait(10) |
|
greenlet.getcurrent() |
|
lists_before = self.count_objects(list, exact_kind=True) |
|
|
|
assert len(background_greenlets) == 1 |
|
self.assertFalse(background_greenlets[0].dead) |
|
|
|
|
|
|
|
del background_greenlets[:] |
|
background_glet_killed.set() |
|
|
|
|
|
t.join(10) |
|
del t |
|
|
|
|
|
|
|
self.wait_for_pending_cleanups() |
|
|
|
lists_after = self.count_objects(list, exact_kind=True) |
|
greenlets_after = self.count_objects(greenlet.greenlet, exact_kind=False) |
|
|
|
|
|
|
|
|
|
self.assertLessEqual(lists_after, lists_before) |
|
|
|
|
|
|
|
|
|
|
|
|
|
if not explicit_reference_to_switch \ |
|
and greenlet._greenlet.get_clocks_used_doing_optional_cleanup() is not None: |
|
|
|
self.assertEqual(greenlets_after, greenlets_before) |
|
if manually_collect_background: |
|
|
|
|
|
|
|
self.assertEqual(HasFinalizerTracksInstances.EXTANT_INSTANCES, set()) |
|
else: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pass |
|
|
|
if greenlet._greenlet.get_clocks_used_doing_optional_cleanup() is not None: |
|
self.assertClocksUsed() |
|
|
|
def test_issue251_killing_cross_thread_leaks_list(self): |
|
self._check_issue251() |
|
|
|
def test_issue251_with_cleanup_disabled(self): |
|
greenlet._greenlet.enable_optional_cleanup(False) |
|
try: |
|
self._check_issue251() |
|
finally: |
|
greenlet._greenlet.enable_optional_cleanup(True) |
|
|
|
@fails_leakcheck |
|
def test_issue251_issue252_need_to_collect_in_background(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._check_issue251(manually_collect_background=False) |
|
|
|
@fails_leakcheck |
|
def test_issue251_issue252_need_to_collect_in_background_cleanup_disabled(self): |
|
self.expect_greenlet_leak = True |
|
greenlet._greenlet.enable_optional_cleanup(False) |
|
try: |
|
self._check_issue251(manually_collect_background=False) |
|
finally: |
|
greenlet._greenlet.enable_optional_cleanup(True) |
|
|
|
@fails_leakcheck |
|
def test_issue251_issue252_explicit_reference_not_collectable(self): |
|
self._check_issue251( |
|
manually_collect_background=False, |
|
explicit_reference_to_switch=True) |
|
|
|
UNTRACK_ATTEMPTS = 100 |
|
|
|
def _only_test_some_versions(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
assert sys.version_info[0] >= 3 |
|
if sys.version_info[:2] < (3, 8): |
|
self.skipTest('Only observed on 3.11') |
|
if RUNNING_ON_MANYLINUX: |
|
self.skipTest("Slow and not worth repeating here") |
|
|
|
@ignores_leakcheck |
|
|
|
|
|
def test_untracked_memory_doesnt_increase(self): |
|
|
|
|
|
self._only_test_some_versions() |
|
def f(): |
|
return 1 |
|
|
|
ITER = 10000 |
|
def run_it(): |
|
for _ in range(ITER): |
|
greenlet.greenlet(f).switch() |
|
|
|
|
|
for _ in range(3): |
|
run_it() |
|
|
|
|
|
|
|
|
|
uss_before = self.get_process_uss() |
|
|
|
for count in range(self.UNTRACK_ATTEMPTS): |
|
uss_before = max(uss_before, self.get_process_uss()) |
|
run_it() |
|
|
|
uss_after = self.get_process_uss() |
|
if uss_after <= uss_before and count > 1: |
|
break |
|
|
|
self.assertLessEqual(uss_after, uss_before) |
|
|
|
def _check_untracked_memory_thread(self, deallocate_in_thread=True): |
|
self._only_test_some_versions() |
|
|
|
|
|
|
|
EXIT_COUNT = [0] |
|
|
|
def f(): |
|
try: |
|
greenlet.getcurrent().parent.switch() |
|
except greenlet.GreenletExit: |
|
EXIT_COUNT[0] += 1 |
|
raise |
|
return 1 |
|
|
|
ITER = 10000 |
|
def run_it(): |
|
glets = [] |
|
for _ in range(ITER): |
|
|
|
|
|
|
|
g = greenlet.greenlet(f) |
|
glets.append(g) |
|
g.switch() |
|
|
|
return glets |
|
|
|
test = self |
|
|
|
class ThreadFunc: |
|
uss_before = uss_after = 0 |
|
glets = () |
|
ITER = 2 |
|
def __call__(self): |
|
self.uss_before = test.get_process_uss() |
|
|
|
for _ in range(self.ITER): |
|
self.glets += tuple(run_it()) |
|
|
|
for g in self.glets: |
|
test.assertIn('suspended active', str(g)) |
|
|
|
if deallocate_in_thread: |
|
self.glets = () |
|
self.uss_after = test.get_process_uss() |
|
|
|
|
|
uss_before = uss_after = None |
|
for count in range(self.UNTRACK_ATTEMPTS): |
|
EXIT_COUNT[0] = 0 |
|
thread_func = ThreadFunc() |
|
t = threading.Thread(target=thread_func) |
|
t.start() |
|
t.join(30) |
|
self.assertFalse(t.is_alive()) |
|
|
|
if uss_before is None: |
|
uss_before = thread_func.uss_before |
|
|
|
uss_before = max(uss_before, thread_func.uss_before) |
|
if deallocate_in_thread: |
|
self.assertEqual(thread_func.glets, ()) |
|
self.assertEqual(EXIT_COUNT[0], ITER * thread_func.ITER) |
|
|
|
del thread_func |
|
del t |
|
if not deallocate_in_thread: |
|
self.assertEqual(EXIT_COUNT[0], 0) |
|
if deallocate_in_thread: |
|
self.wait_for_pending_cleanups() |
|
|
|
uss_after = self.get_process_uss() |
|
|
|
if uss_after <= uss_before and count > 1: |
|
break |
|
|
|
self.wait_for_pending_cleanups() |
|
uss_after = self.get_process_uss() |
|
self.assertLessEqual(uss_after, uss_before, "after attempts %d" % (count,)) |
|
|
|
@ignores_leakcheck |
|
|
|
|
|
def test_untracked_memory_doesnt_increase_unfinished_thread_dealloc_in_thread(self): |
|
self._check_untracked_memory_thread(deallocate_in_thread=True) |
|
|
|
@ignores_leakcheck |
|
|
|
|
|
def test_untracked_memory_doesnt_increase_unfinished_thread_dealloc_in_main(self): |
|
self._check_untracked_memory_thread(deallocate_in_thread=False) |
|
|
|
if __name__ == '__main__': |
|
__import__('unittest').main() |
|
|