|
|
|
""" |
|
Tests for greenlet. |
|
|
|
""" |
|
import os |
|
import sys |
|
import unittest |
|
|
|
from gc import collect |
|
from gc import get_objects |
|
from threading import active_count as active_thread_count |
|
from time import sleep |
|
from time import time |
|
|
|
import psutil |
|
|
|
from greenlet import greenlet as RawGreenlet |
|
from greenlet import getcurrent |
|
|
|
from greenlet._greenlet import get_pending_cleanup_count |
|
from greenlet._greenlet import get_total_main_greenlets |
|
|
|
from . import leakcheck |
|
|
|
PY312 = sys.version_info[:2] >= (3, 12) |
|
PY313 = sys.version_info[:2] >= (3, 13) |
|
|
|
WIN = sys.platform.startswith("win") |
|
RUNNING_ON_GITHUB_ACTIONS = os.environ.get('GITHUB_ACTIONS') |
|
RUNNING_ON_TRAVIS = os.environ.get('TRAVIS') or RUNNING_ON_GITHUB_ACTIONS |
|
RUNNING_ON_APPVEYOR = os.environ.get('APPVEYOR') |
|
RUNNING_ON_CI = RUNNING_ON_TRAVIS or RUNNING_ON_APPVEYOR |
|
RUNNING_ON_MANYLINUX = os.environ.get('GREENLET_MANYLINUX') |
|
|
|
class TestCaseMetaClass(type): |
|
|
|
|
|
def __new__(cls, classname, bases, classDict): |
|
|
|
|
|
|
|
|
|
check_totalrefcount = True |
|
|
|
|
|
|
|
|
|
for key, value in list(classDict.items()): |
|
if key.startswith('test') and callable(value): |
|
classDict.pop(key) |
|
if check_totalrefcount: |
|
value = leakcheck.wrap_refcount(value) |
|
classDict[key] = value |
|
return type.__new__(cls, classname, bases, classDict) |
|
|
|
|
|
class TestCase(TestCaseMetaClass( |
|
"NewBase", |
|
(unittest.TestCase,), |
|
{})): |
|
|
|
cleanup_attempt_sleep_duration = 0.001 |
|
cleanup_max_sleep_seconds = 1 |
|
|
|
def wait_for_pending_cleanups(self, |
|
initial_active_threads=None, |
|
initial_main_greenlets=None): |
|
initial_active_threads = initial_active_threads or self.threads_before_test |
|
initial_main_greenlets = initial_main_greenlets or self.main_greenlets_before_test |
|
sleep_time = self.cleanup_attempt_sleep_duration |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sleep(sleep_time) |
|
quit_after = time() + self.cleanup_max_sleep_seconds |
|
|
|
|
|
while ( |
|
get_pending_cleanup_count() |
|
or active_thread_count() > initial_active_threads |
|
or (not self.expect_greenlet_leak |
|
and get_total_main_greenlets() > initial_main_greenlets)): |
|
sleep(sleep_time) |
|
if time() > quit_after: |
|
print("Time limit exceeded.") |
|
print("Threads: Waiting for only", initial_active_threads, |
|
"-->", active_thread_count()) |
|
print("MGlets : Waiting for only", initial_main_greenlets, |
|
"-->", get_total_main_greenlets()) |
|
break |
|
collect() |
|
|
|
def count_objects(self, kind=list, exact_kind=True): |
|
|
|
|
|
for _ in range(3): |
|
collect() |
|
if exact_kind: |
|
return sum( |
|
1 |
|
for x in get_objects() |
|
if type(x) is kind |
|
) |
|
|
|
return sum( |
|
1 |
|
for x in get_objects() |
|
if isinstance(x, kind) |
|
) |
|
|
|
greenlets_before_test = 0 |
|
threads_before_test = 0 |
|
main_greenlets_before_test = 0 |
|
expect_greenlet_leak = False |
|
|
|
def count_greenlets(self): |
|
""" |
|
Find all the greenlets and subclasses tracked by the GC. |
|
""" |
|
return self.count_objects(RawGreenlet, False) |
|
|
|
def setUp(self): |
|
|
|
|
|
super().setUp() |
|
getcurrent() |
|
self.threads_before_test = active_thread_count() |
|
self.main_greenlets_before_test = get_total_main_greenlets() |
|
self.wait_for_pending_cleanups(self.threads_before_test, self.main_greenlets_before_test) |
|
self.greenlets_before_test = self.count_greenlets() |
|
|
|
def tearDown(self): |
|
if getattr(self, 'skipTearDown', False): |
|
return |
|
|
|
self.wait_for_pending_cleanups(self.threads_before_test, self.main_greenlets_before_test) |
|
super().tearDown() |
|
|
|
def get_expected_returncodes_for_aborted_process(self): |
|
import signal |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
expected_exit = ( |
|
-signal.SIGABRT, |
|
|
|
|
|
|
|
|
|
-signal.SIGSEGV, |
|
) if not WIN else ( |
|
3, |
|
0xc0000409, |
|
0xc0000005, |
|
) |
|
return expected_exit |
|
|
|
def get_process_uss(self): |
|
""" |
|
Return the current process's USS in bytes. |
|
|
|
uss is available on Linux, macOS, Windows. Also known as |
|
"Unique Set Size", this is the memory which is unique to a |
|
process and which would be freed if the process was terminated |
|
right now. |
|
|
|
If this is not supported by ``psutil``, this raises the |
|
:exc:`unittest.SkipTest` exception. |
|
""" |
|
try: |
|
return psutil.Process().memory_full_info().uss |
|
except AttributeError as e: |
|
raise unittest.SkipTest("uss not supported") from e |
|
|
|
def run_script(self, script_name, show_output=True): |
|
import subprocess |
|
script = os.path.join( |
|
os.path.dirname(__file__), |
|
script_name, |
|
) |
|
|
|
try: |
|
return subprocess.check_output([sys.executable, script], |
|
encoding='utf-8', |
|
stderr=subprocess.STDOUT) |
|
except subprocess.CalledProcessError as ex: |
|
if show_output: |
|
print('-----') |
|
print('Failed to run script', script) |
|
print('~~~~~') |
|
print(ex.output) |
|
print('------') |
|
raise |
|
|
|
|
|
def assertScriptRaises(self, script_name, exitcodes=None): |
|
import subprocess |
|
with self.assertRaises(subprocess.CalledProcessError) as exc: |
|
output = self.run_script(script_name, show_output=False) |
|
__traceback_info__ = output |
|
|
|
|
|
|
|
if exitcodes is None: |
|
exitcodes = self.get_expected_returncodes_for_aborted_process() |
|
self.assertIn(exc.exception.returncode, exitcodes) |
|
return exc.exception |
|
|