File size: 17,465 Bytes
db4a26f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 |
# -*- coding: utf-8 -*-
"""
Testing scenarios that may have leaked.
"""
from __future__ import print_function, absolute_import, division
import sys
import gc
import time
import weakref
import threading
import greenlet
from . import TestCase
from .leakcheck import fails_leakcheck
from .leakcheck import ignores_leakcheck
from .leakcheck import RUNNING_ON_MANYLINUX
# pylint:disable=protected-access
assert greenlet.GREENLET_USE_GC # Option to disable this was removed in 1.0
class HasFinalizerTracksInstances(object):
EXTANT_INSTANCES = set()
def __init__(self, msg):
self.msg = sys.intern(msg)
self.EXTANT_INSTANCES.add(id(self))
def __del__(self):
self.EXTANT_INSTANCES.remove(id(self))
def __repr__(self):
return "<HasFinalizerTracksInstances at 0x%x %r>" % (
id(self), self.msg
)
@classmethod
def reset(cls):
cls.EXTANT_INSTANCES.clear()
class TestLeaks(TestCase):
def test_arg_refs(self):
args = ('a', 'b', 'c')
refcount_before = sys.getrefcount(args)
# pylint:disable=unnecessary-lambda
g = greenlet.greenlet(
lambda *args: greenlet.getcurrent().parent.switch(*args))
for _ in range(100):
g.switch(*args)
self.assertEqual(sys.getrefcount(args), refcount_before)
def test_kwarg_refs(self):
kwargs = {}
# pylint:disable=unnecessary-lambda
g = greenlet.greenlet(
lambda **kwargs: greenlet.getcurrent().parent.switch(**kwargs))
for _ in range(100):
g.switch(**kwargs)
self.assertEqual(sys.getrefcount(kwargs), 2)
@staticmethod
def __recycle_threads():
# By introducing a thread that does sleep we allow other threads,
# that have triggered their __block condition, but did not have a
# chance to deallocate their thread state yet, to finally do so.
# The way it works is by requiring a GIL switch (different thread),
# which does a GIL release (sleep), which might do a GIL switch
# to finished threads and allow them to clean up.
def worker():
time.sleep(0.001)
t = threading.Thread(target=worker)
t.start()
time.sleep(0.001)
t.join(10)
def test_threaded_leak(self):
gg = []
def worker():
# only main greenlet present
gg.append(weakref.ref(greenlet.getcurrent()))
for _ in range(2):
t = threading.Thread(target=worker)
t.start()
t.join(10)
del t
greenlet.getcurrent() # update ts_current
self.__recycle_threads()
greenlet.getcurrent() # update ts_current
gc.collect()
greenlet.getcurrent() # update ts_current
for g in gg:
self.assertIsNone(g())
def test_threaded_adv_leak(self):
gg = []
def worker():
# main and additional *finished* greenlets
ll = greenlet.getcurrent().ll = []
def additional():
ll.append(greenlet.getcurrent())
for _ in range(2):
greenlet.greenlet(additional).switch()
gg.append(weakref.ref(greenlet.getcurrent()))
for _ in range(2):
t = threading.Thread(target=worker)
t.start()
t.join(10)
del t
greenlet.getcurrent() # update ts_current
self.__recycle_threads()
greenlet.getcurrent() # update ts_current
gc.collect()
greenlet.getcurrent() # update ts_current
for g in gg:
self.assertIsNone(g())
def assertClocksUsed(self):
used = greenlet._greenlet.get_clocks_used_doing_optional_cleanup()
self.assertGreaterEqual(used, 0)
# we don't lose the value
greenlet._greenlet.enable_optional_cleanup(True)
used2 = greenlet._greenlet.get_clocks_used_doing_optional_cleanup()
self.assertEqual(used, used2)
self.assertGreater(greenlet._greenlet.CLOCKS_PER_SEC, 1)
def _check_issue251(self,
manually_collect_background=True,
explicit_reference_to_switch=False):
# See https://github.com/python-greenlet/greenlet/issues/251
# Killing a greenlet (probably not the main one)
# in one thread from another thread would
# result in leaking a list (the ts_delkey list).
# We no longer use lists to hold that stuff, though.
# For the test to be valid, even empty lists have to be tracked by the
# GC
assert gc.is_tracked([])
HasFinalizerTracksInstances.reset()
greenlet.getcurrent()
greenlets_before = self.count_objects(greenlet.greenlet, exact_kind=False)
background_glet_running = threading.Event()
background_glet_killed = threading.Event()
background_greenlets = []
# XXX: Switching this to a greenlet subclass that overrides
# run results in all callers failing the leaktest; that
# greenlet instance is leaked. There's a bound method for
# run() living on the stack of the greenlet in g_initialstub,
# and since we don't manually switch back to the background
# greenlet to let it "fall off the end" and exit the
# g_initialstub function, it never gets cleaned up. Making the
# garbage collector aware of this bound method (making it an
# attribute of the greenlet structure and traversing into it)
# doesn't help, for some reason.
def background_greenlet():
# Throw control back to the main greenlet.
jd = HasFinalizerTracksInstances("DELETING STACK OBJECT")
greenlet._greenlet.set_thread_local(
'test_leaks_key',
HasFinalizerTracksInstances("DELETING THREAD STATE"))
# Explicitly keeping 'switch' in a local variable
# breaks this test in all versions
if explicit_reference_to_switch:
s = greenlet.getcurrent().parent.switch
s([jd])
else:
greenlet.getcurrent().parent.switch([jd])
bg_main_wrefs = []
def background_thread():
glet = greenlet.greenlet(background_greenlet)
bg_main_wrefs.append(weakref.ref(glet.parent))
background_greenlets.append(glet)
glet.switch() # Be sure it's active.
# Control is ours again.
del glet # Delete one reference from the thread it runs in.
background_glet_running.set()
background_glet_killed.wait(10)
# To trigger the background collection of the dead
# greenlet, thus clearing out the contents of the list, we
# need to run some APIs. See issue 252.
if manually_collect_background:
greenlet.getcurrent()
t = threading.Thread(target=background_thread)
t.start()
background_glet_running.wait(10)
greenlet.getcurrent()
lists_before = self.count_objects(list, exact_kind=True)
assert len(background_greenlets) == 1
self.assertFalse(background_greenlets[0].dead)
# Delete the last reference to the background greenlet
# from a different thread. This puts it in the background thread's
# ts_delkey list.
del background_greenlets[:]
background_glet_killed.set()
# Now wait for the background thread to die.
t.join(10)
del t
# As part of the fix for 252, we need to cycle the ceval.c
# interpreter loop to be sure it has had a chance to process
# the pending call.
self.wait_for_pending_cleanups()
lists_after = self.count_objects(list, exact_kind=True)
greenlets_after = self.count_objects(greenlet.greenlet, exact_kind=False)
# On 2.7, we observe that lists_after is smaller than
# lists_before. No idea what lists got cleaned up. All the
# Python 3 versions match exactly.
self.assertLessEqual(lists_after, lists_before)
# On versions after 3.6, we've successfully cleaned up the
# greenlet references thanks to the internal "vectorcall"
# protocol; prior to that, there is a reference path through
# the ``greenlet.switch`` method still on the stack that we
# can't reach to clean up. The C code goes through terrific
# lengths to clean that up.
if not explicit_reference_to_switch \
and greenlet._greenlet.get_clocks_used_doing_optional_cleanup() is not None:
# If cleanup was disabled, though, we may not find it.
self.assertEqual(greenlets_after, greenlets_before)
if manually_collect_background:
# TODO: Figure out how to make this work!
# The one on the stack is still leaking somehow
# in the non-manually-collect state.
self.assertEqual(HasFinalizerTracksInstances.EXTANT_INSTANCES, set())
else:
# The explicit reference prevents us from collecting it
# and it isn't always found by the GC either for some
# reason. The entire frame is leaked somehow, on some
# platforms (e.g., MacPorts builds of Python (all
# versions!)), but not on other platforms (the linux and
# windows builds on GitHub actions and Appveyor). So we'd
# like to write a test that proves that the main greenlet
# sticks around, and we can on my machine (macOS 11.6,
# MacPorts builds of everything) but we can't write that
# same test on other platforms. However, hopefully iteration
# done by leakcheck will find it.
pass
if greenlet._greenlet.get_clocks_used_doing_optional_cleanup() is not None:
self.assertClocksUsed()
def test_issue251_killing_cross_thread_leaks_list(self):
self._check_issue251()
def test_issue251_with_cleanup_disabled(self):
greenlet._greenlet.enable_optional_cleanup(False)
try:
self._check_issue251()
finally:
greenlet._greenlet.enable_optional_cleanup(True)
@fails_leakcheck
def test_issue251_issue252_need_to_collect_in_background(self):
# Between greenlet 1.1.2 and the next version, this was still
# failing because the leak of the list still exists when we
# don't call a greenlet API before exiting the thread. The
# proximate cause is that neither of the two greenlets from
# the background thread are actually being destroyed, even
# though the GC is in fact visiting both objects. It's not
# clear where that leak is? For some reason the thread-local
# dict holding it isn't being cleaned up.
#
# The leak, I think, is in the CPYthon internal function that
# calls into green_switch(). The argument tuple is still on
# the C stack somewhere and can't be reached? That doesn't
# make sense, because the tuple should be collectable when
# this object goes away.
#
# Note that this test sometimes spuriously passes on Linux,
# for some reason, but I've never seen it pass on macOS.
self._check_issue251(manually_collect_background=False)
@fails_leakcheck
def test_issue251_issue252_need_to_collect_in_background_cleanup_disabled(self):
self.expect_greenlet_leak = True
greenlet._greenlet.enable_optional_cleanup(False)
try:
self._check_issue251(manually_collect_background=False)
finally:
greenlet._greenlet.enable_optional_cleanup(True)
@fails_leakcheck
def test_issue251_issue252_explicit_reference_not_collectable(self):
self._check_issue251(
manually_collect_background=False,
explicit_reference_to_switch=True)
UNTRACK_ATTEMPTS = 100
def _only_test_some_versions(self):
# We're only looking for this problem specifically on 3.11,
# and this set of tests is relatively fragile, depending on
# OS and memory management details. So we want to run it on 3.11+
# (obviously) but not every older 3.x version in order to reduce
# false negatives. At the moment, those false results seem to have
# resolved, so we are actually running this on 3.8+
assert sys.version_info[0] >= 3
if sys.version_info[:2] < (3, 8):
self.skipTest('Only observed on 3.11')
if RUNNING_ON_MANYLINUX:
self.skipTest("Slow and not worth repeating here")
@ignores_leakcheck
# Because we're just trying to track raw memory, not objects, and running
# the leakcheck makes an already slow test slower.
def test_untracked_memory_doesnt_increase(self):
# See https://github.com/gevent/gevent/issues/1924
# and https://github.com/python-greenlet/greenlet/issues/328
self._only_test_some_versions()
def f():
return 1
ITER = 10000
def run_it():
for _ in range(ITER):
greenlet.greenlet(f).switch()
# Establish baseline
for _ in range(3):
run_it()
# uss: (Linux, macOS, Windows): aka "Unique Set Size", this is
# the memory which is unique to a process and which would be
# freed if the process was terminated right now.
uss_before = self.get_process_uss()
for count in range(self.UNTRACK_ATTEMPTS):
uss_before = max(uss_before, self.get_process_uss())
run_it()
uss_after = self.get_process_uss()
if uss_after <= uss_before and count > 1:
break
self.assertLessEqual(uss_after, uss_before)
def _check_untracked_memory_thread(self, deallocate_in_thread=True):
self._only_test_some_versions()
# Like the above test, but what if there are a bunch of
# unfinished greenlets in a thread that dies?
# Does it matter if we deallocate in the thread or not?
EXIT_COUNT = [0]
def f():
try:
greenlet.getcurrent().parent.switch()
except greenlet.GreenletExit:
EXIT_COUNT[0] += 1
raise
return 1
ITER = 10000
def run_it():
glets = []
for _ in range(ITER):
# Greenlet starts, switches back to us.
# We keep a strong reference to the greenlet though so it doesn't
# get a GreenletExit exception.
g = greenlet.greenlet(f)
glets.append(g)
g.switch()
return glets
test = self
class ThreadFunc:
uss_before = uss_after = 0
glets = ()
ITER = 2
def __call__(self):
self.uss_before = test.get_process_uss()
for _ in range(self.ITER):
self.glets += tuple(run_it())
for g in self.glets:
test.assertIn('suspended active', str(g))
# Drop them.
if deallocate_in_thread:
self.glets = ()
self.uss_after = test.get_process_uss()
# Establish baseline
uss_before = uss_after = None
for count in range(self.UNTRACK_ATTEMPTS):
EXIT_COUNT[0] = 0
thread_func = ThreadFunc()
t = threading.Thread(target=thread_func)
t.start()
t.join(30)
self.assertFalse(t.is_alive())
if uss_before is None:
uss_before = thread_func.uss_before
uss_before = max(uss_before, thread_func.uss_before)
if deallocate_in_thread:
self.assertEqual(thread_func.glets, ())
self.assertEqual(EXIT_COUNT[0], ITER * thread_func.ITER)
del thread_func # Deallocate the greenlets; but this won't raise into them
del t
if not deallocate_in_thread:
self.assertEqual(EXIT_COUNT[0], 0)
if deallocate_in_thread:
self.wait_for_pending_cleanups()
uss_after = self.get_process_uss()
# See if we achieve a non-growth state at some point. Break when we do.
if uss_after <= uss_before and count > 1:
break
self.wait_for_pending_cleanups()
uss_after = self.get_process_uss()
self.assertLessEqual(uss_after, uss_before, "after attempts %d" % (count,))
@ignores_leakcheck
# Because we're just trying to track raw memory, not objects, and running
# the leakcheck makes an already slow test slower.
def test_untracked_memory_doesnt_increase_unfinished_thread_dealloc_in_thread(self):
self._check_untracked_memory_thread(deallocate_in_thread=True)
@ignores_leakcheck
# Because the main greenlets from the background threads do not exit in a timely fashion,
# we fail the object-based leakchecks.
def test_untracked_memory_doesnt_increase_unfinished_thread_dealloc_in_main(self):
self._check_untracked_memory_thread(deallocate_in_thread=False)
if __name__ == '__main__':
__import__('unittest').main()
|