diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 30156f63..118e2fca 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,6 +12,8 @@ Changes is available (#427) * FIX: Bytecodes of profiled functions now always labeled to prevent confusion with non-profiled "twins" (#425) +* FEAT: Experimental support for profiling child processes with + ``kernprof --prof-child-procs`` (#431) 5.0.2 diff --git a/kernprof.py b/kernprof.py index 8a7c4d6a..96888468 100755 --- a/kernprof.py +++ b/kernprof.py @@ -79,6 +79,7 @@ def main(): [-s SETUP] [-p {path/to/script | object.dotted.path}[,...]] [--preimports [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]] [--prof-imports [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]] + [--prof-child-procs [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]] [-o OUTFILE] [-v] [-q] [--rich [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]] [-u UNIT] @@ -137,6 +138,10 @@ def main(): If the script/module profiled is in `--prof-mod`, autoprofile all its imports. Only works with line profiling (`-l`/`--line- by-line`). (Default: False) + --prof-child-procs [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0] + Extend profiling into child Python processes. Only works with + line profiling (`-l`/`--line-by-line`). (EXPERIMENTAL; + default: False) output options: -o, --outfile OUTFILE @@ -187,7 +192,6 @@ def main(): """ # noqa: E501 import atexit -import builtins import functools import os import sys @@ -198,9 +202,7 @@ def main(): import shutil import tempfile import time -import warnings -from argparse import ArgumentParser -from io import StringIO +from argparse import ArgumentParser, SUPPRESS from operator import methodcaller from runpy import run_module from pathlib import Path @@ -229,11 +231,16 @@ def main(): short_string_path, ) from line_profiler.profiler_mixin import ByCountProfilerMixin +from line_profiler._child_process_profiling.cache import LineProfilingCache from line_profiler._logger import Logger from line_profiler import _diagnostics as diagnostics DIAGNOSITICS_VERBOSITY = 2 +CLEANUP_PRIORITIES = { # Bigger number -> more delayed + 'rm_cache_dir': 1024, + 'gather_logs': 1, +} def execfile(filename, globals=None, locals=None): @@ -330,6 +337,7 @@ def resolve_module_path(mod_name): # type: (str) -> str | None fname = mod_spec.origin # type: str | None if fname and os.path.exists(fname): return fname + return None get_module_path = modname_to_modpath if static else resolve_module_path @@ -681,6 +689,14 @@ def _add_core_parser_arguments(parser): 'Only works with line profiling (`-l`/`--line-by-line`). ' f'(Default: {default.conf_dict["prof_imports"]})', ) + add_argument( + prof_opts, + '--prof-child-procs', + action='store_true', + help='Extend profiling into child Python processes. ' + 'Only works with line profiling (`-l`/`--line-by-line`). ' + f'(EXPERIMENTAL; default: {default.conf_dict["prof_child_procs"]})', + ) out_opts = parser.add_argument_group('output options') if default.conf_dict['outfile']: def_outfile = repr(default.conf_dict['outfile']) @@ -771,6 +787,8 @@ def _add_core_parser_arguments(parser): 'Minimum value (and the value implied if the bare option ' f'is given) is 1 s. (Default: {def_out_int})', ) + # Hidden option for dumping the debug logs to a desinated location + add_argument(out_opts, '--debug-log', help=SUPPRESS) def _build_parsers(args=None): @@ -803,8 +821,8 @@ def _build_parsers(args=None): # We've already consumed the `-m `, so we need a dummy # parser for generating the help text; # but the real parser should not consume the `options.script` - # positional arg, and it it got the `--help` option, it should - # hand off the the dummy parser + # positional arg, and if it got the `--help` option, it should + # hand off to the dummy parser real_parser = ArgumentParser(add_help=False, **parser_kwargs) real_parser.add_argument('-h', '--help', action='store_true') help_parser = ArgumentParser(**parser_kwargs) @@ -1043,104 +1061,33 @@ def _write_tempfile(source, content, options): ) -def _gather_preimport_targets(options, exclude): - """ - Used in _write_preimports - """ - from line_profiler.autoprofile.util_static import modpath_to_modname - from line_profiler.autoprofile.eager_preimports import is_dotted_path - - filtered_targets = [] - recurse_targets = [] - invalid_targets = [] - for target in options.prof_mod: - if is_dotted_path(target): - modname = target - else: - # Paths already normalized by - # `_normalize_profiling_targets()` - if not os.path.exists(target): - invalid_targets.append(target) - continue - if any(os.path.samefile(target, excluded) for excluded in exclude): - # Ignore the script to be run in eager importing - # (`line_profiler.autoprofile.autoprofile.run()` will - # handle it) - continue - modname = modpath_to_modname(target, hide_init=False) - if modname is None: # Not import-able - invalid_targets.append(target) - continue - if modname.endswith('.__init__'): - modname = modname.rpartition('.')[0] - filtered_targets.append(modname) - else: - recurse_targets.append(modname) - if invalid_targets: - invalid_targets = sorted(set(invalid_targets)) - msg = ( - '{} profile-on-import target{} cannot be converted to ' - 'dotted-path form: {!r}'.format( - len(invalid_targets), - '' if len(invalid_targets) == 1 else 's', - invalid_targets, - ) - ) - warnings.warn(msg) - diagnostics.log.warning(msg) - - return filtered_targets, recurse_targets - - -def _write_preimports(prof, options, exclude): +def _write_preimports(prof, options, exclude, keep=False): """ Called by :py:func:`main()` to handle eager pre-imports; not to be invoked on its own. """ - from line_profiler.autoprofile.eager_preimports import ( - write_eager_import_module, - ) - from line_profiler.autoprofile.autoprofile import ( - _extend_line_profiler_for_profiling_imports as upgrade_profiler, - ) + from line_profiler.curated_profiling import ClassifiedPreimportTargets - filtered_targets, recurse_targets = _gather_preimport_targets( - options, exclude - ) - if not (filtered_targets or recurse_targets): - return # We could've done everything in-memory with `io.StringIO` and `exec()`, # but that results in indecipherable tracebacks should anything goes wrong; # so we write to a tempfile and `execfile()` it - upgrade_profiler(prof) temp_mod_path = _touch_tempfile( dir=options.tmpdir, prefix='kernprof-eager-preimports-', suffix='.py' ) - write_module_kwargs = { - 'dotted_paths': filtered_targets, - 'recurse': recurse_targets, - 'static': options.static, - } - temp_file = open(temp_mod_path, mode='w') - if options.debug: - with StringIO() as sio: - write_eager_import_module(stream=sio, **write_module_kwargs) - code = sio.getvalue() - with temp_file as fobj: - print(code, file=fobj) - diagnostics.log.debug( - 'Wrote temporary module for pre-imports to ' - f'{short_string_path(temp_mod_path)!r}' + with open(temp_mod_path, mode='w') as fobj: + preimports = ClassifiedPreimportTargets.from_targets( + options.prof_mod, exclude, ) - else: - with temp_file as fobj: - write_eager_import_module(stream=fobj, **write_module_kwargs) - if not options.dryrun: + preimports.write_preimport_module( + fobj, debug=options.debug, static=options.static, + ) + if preimports and not options.dryrun: ns = {} # Use a fresh namespace execfile(temp_mod_path, ns, ns) # Delete the tempfile ASAP if its execution succeeded - if not diagnostics.KEEP_TEMPDIRS: + if not (keep or diagnostics.KEEP_TEMPDIRS): _remove(temp_mod_path) + return temp_mod_path def _remove(path, *, recursive=False, missing_ok=False): @@ -1154,9 +1101,20 @@ def _remove(path, *, recursive=False, missing_ok=False): path.unlink(missing_ok=missing_ok) -def _dump_filtered_stats(tmpdir, prof, filename): +def _dump_filtered_stats(tmpdir, prof, filename, extra_line_stats=None): import os - import pickle + + if isinstance(prof, ContextualProfile): + # - Not using `line_profiler` + # -> doesn't matter if the source lines can't be retrieved + # -> no need to filter anything + prof.dump_stats(filename) + return + + # Remember to incorporate extra stats where available + line_stats = prof.get_stats() + if extra_line_stats is not None: + line_stats += extra_line_stats # Build list of known temp file paths tempfile_paths = [ @@ -1164,31 +1122,28 @@ def _dump_filtered_stats(tmpdir, prof, filename): for dirpath, _, fnames in os.walk(tmpdir) for fname in fnames ] - - if not tempfile_paths or isinstance(prof, ContextualProfile): + if not tempfile_paths: # - No tempfiles written -> no function lives in tempfiles # -> no need to filter anything - # - Not using `line_profiler` - # -> doesn't matter if the source lines can't be retrieved - # -> no need to filter anything - prof.dump_stats(filename) + line_stats.to_file(filename) return + _dump_filtered_line_stats(line_stats, tempfile_paths, filename) + + +def _dump_filtered_line_stats(stats, exclude, filename): # Filter the filenames to remove data from tempfiles, which will # have been deleted by the time the results are viewed in a # separate process - stats = prof.get_stats() timings = stats.timings for key in set(timings): fname = key[0] try: - if any(os.path.samefile(fname, tmp) for tmp in tempfile_paths): + if any(os.path.samefile(fname, tmp) for tmp in exclude): del timings[key] except OSError: del timings[key] - - with open(filename, 'wb') as f: - pickle.dump(stats, f, protocol=pickle.HIGHEST_PROTOCOL) + stats.to_file(filename) def _format_call_message(func, *args, **kwargs): @@ -1231,13 +1186,82 @@ def _call_with_diagnostics(options, func, *args, **kwargs): return func(*args, **kwargs) -def _pre_profile(options, module, exit_on_error): +class _manage_profiler: """ Prepare the environment to execute profiling with requested options. Note: modifies ``options`` with extra attributes. """ + cache: LineProfilingCache + + def __init__(self, options, module, exit_on_error): + self.options = options + self.module = module + self.exit_on_error = exit_on_error + + def __enter__(self): + from line_profiler.curated_profiling import CuratedProfilerContext + + self.prof = _prepare_profiler( + self.options, self.module, self.exit_on_error, + ) + self._ctx = CuratedProfilerContext( + self.prof, insert_builtin=self.options.builtin, + ) + self._ctx.install() + # Keep the generated pre-imports file to be reused in child + # processes + script_file, preimports_file = _prepare_exec_script( + self.options, self.module, self.prof, + exit_on_error=self.exit_on_error, + keep_preimports_file=self.set_up_child_profiling, + ) + if self.set_up_child_profiling: + self.cache = _prepare_child_profiling_cache( + self.options, self.prof, preimports_file, script_file, + ) + # Add a deferred callback for gathering debug logfiles + # (should run right before `.cache.cache_dir` is wiped) + if self.options.debug_log: + self.cache._add_cleanup( + self._gather_debug_log, + CLEANUP_PRIORITIES['gather_logs'], + self.options.debug_log, + ) + return self.prof, script_file + + def __exit__(self, *_, **__): + try: + extra_stats = None + if self.set_up_child_profiling: + try: + if self.cache.debug: + # Recover debug output from child processes + self.cache._dump_debug_logs() + extra_stats = self.cache.gather_stats() + finally: + self.cache.cleanup() + _post_profile(self.options, self.prof, extra_stats) + finally: + self._ctx.uninstall() + + def _gather_debug_log(self, logfile): + with open(logfile, mode='w') as fobj: + for entry in self.cache._gather_debug_log_entries(): + print(entry.to_text(), file=fobj) + + @property + def set_up_child_profiling(self): + return bool( + self.options.line_by_line and self.options.prof_child_procs + ) + + +def _prepare_profiler(options, module, exit_on_error): + """ + Set up the appropriate profiler instance. + """ if not options.outfile: extension = 'lprof' if options.line_by_line else 'prof' options.outfile = f'{os.path.basename(options.script)}.{extension}' @@ -1267,24 +1291,26 @@ def _pre_profile(options, module, exit_on_error): execfile(setup_file, ns, ns) if options.line_by_line: - prof = line_profiler.LineProfiler() options.builtin = True + return line_profiler.LineProfiler() elif Profile.__module__ == 'profile': raise RuntimeError( 'non-line-by-line profiling depends on cProfile, ' 'which is not available on this platform' ) else: - prof = ContextualProfile() - - # Overwrite the explicit decorator - global_profiler = line_profiler.profile - install_profiler = global_profiler._kernprof_overwrite - install_profiler(prof) + return ContextualProfile() - if options.builtin: - builtins.__dict__['profile'] = prof +def _prepare_exec_script( + options, module, prof, + *, + exit_on_error=False, + keep_preimports_file=False, +): + """ + Set up the script to be executed among other things. + """ if module: script_file = find_module_script( options.script, static=options.static, exit_on_error=exit_on_error @@ -1304,6 +1330,8 @@ def _pre_profile(options, module, exit_on_error): options.prof_mod = _normalize_profiling_targets(options.prof_mod) if not options.prof_mod: options.preimports = False + + preimports_file = None if options.line_by_line and options.preimports: # We assume most items in `.prof_mod` to be import-able without # significant side effects, but the same cannot be said if it @@ -1311,10 +1339,10 @@ def _pre_profile(options, module, exit_on_error): # even have a `if __name__ == '__main__': ...` guard. So don't # eager-import it. exclude = set() if module else {script_file} - _write_preimports(prof, options, exclude) + preimports_file = _write_preimports( + prof, options, exclude, keep=keep_preimports_file, + ) - options.global_profiler = global_profiler - options.install_profiler = install_profiler if options.output_interval and not options.dryrun: options.rt = RepeatedTimer( max(options.output_interval, 1), prof.dump_stats, options.outfile @@ -1322,7 +1350,49 @@ def _pre_profile(options, module, exit_on_error): else: options.rt = None options.original_stdout = sys.stdout - return script_file, prof + return script_file, preimports_file + + +def _prepare_child_profiling_cache(options, prof, preimports_file, script_file): + """ + Handle the (line-)profiling of spawned/forked child Python + processes. + """ + # Create the cache dir and cache file here; the cache instance will + # be responsible for managing their lifetimes, while derivative + # instances in child processes will merely inherit and use them + cache = LineProfilingCache( + cache_dir=tempfile.mkdtemp(), + config=options.config, + profiling_targets=options.prof_mod, + rewrite_module=script_file, + profile_imports=options.prof_imports, + preimports_module=preimports_file, + insert_builtin=options.builtin, + debug=bool(options.debug or options.debug_log), + ) + clean_up = functools.partial(cache.add_cleanup, _remove, missing_ok=True) + if not diagnostics.KEEP_TEMPDIRS: + # Defer the scrubbing of the cache dir + cache._add_cleanup( + _remove, CLEANUP_PRIORITIES['rm_cache_dir'], cache.cache_dir, + recursive=True, + ) + clean_up(cache.filename) + + # This file is handed to us at the end of + # `_manage_profiler.__enter__()`; + # normally it is deleted before `.__enter__()` returns, but when + # child-process profiling is used, it is to persist for the lifetime + # of the cache (so that child processes can do the same preimports) + if not (preimports_file is None or diagnostics.KEEP_TEMPDIRS): + clean_up(preimports_file) + + # Handle various setup tasks (see docs thereof) + cache._setup_in_main_process() + cache.profiler = prof + + return cache def _main_profile(options, module=False, exit_on_error=True): @@ -1330,9 +1400,10 @@ def _main_profile(options, module=False, exit_on_error=True): Called by :py:func:`main()` for the actual execution and profiling of code after initial parsing of options; not to be invoked on its own. """ - script_file, prof = _pre_profile(options, module, exit_on_error) call = functools.partial(_call_with_diagnostics, options) - try: + with _manage_profiler( + options, module, exit_on_error, + ) as (prof, script_file): rmod = functools.partial( run_module, run_name='__main__', alter_sys=True ) @@ -1383,18 +1454,18 @@ def _main_profile(options, module=False, exit_on_error=True): module_ns, module_ns, ) - finally: - _post_profile(options, prof) -def _post_profile(options, prof): +def _post_profile(options, prof, extra_line_stats=None): """ Cleanup setup after executing a main profile """ if options.rt is not None: options.rt.stop() if not options.dryrun: - _dump_filtered_stats(options.tmpdir, prof, options.outfile) + _dump_filtered_stats( + options.tmpdir, prof, options.outfile, extra_line_stats, + ) short_outfile = short_string_path(options.outfile) diagnostics.log.info( ( @@ -1405,9 +1476,15 @@ def _post_profile(options, prof): + f'to {short_outfile!r}' ) if options.verbose > 0 and not options.dryrun: - kwargs = {} - if not isinstance(prof, ContextualProfile): - kwargs.update( + if isinstance(prof, ContextualProfile): + _call_with_diagnostics(options, prof.print_stats) + else: + stats = prof.get_stats() + if extra_line_stats is not None: + stats += extra_line_stats + _call_with_diagnostics( + options, + stats.print, output_unit=options.unit, stripzeros=options.skip_zero, summarize=options.summarize, @@ -1415,7 +1492,6 @@ def _post_profile(options, prof): stream=options.original_stdout, config=options.config, ) - _call_with_diagnostics(options, prof.print_stats, **kwargs) else: py_exe = _python_command() if isinstance(prof, ContextualProfile): @@ -1427,12 +1503,6 @@ def _post_profile(options, prof): f'{quote(py_exe)} -m {show_mod} ' f'{quote(short_outfile)}' ) - # Fully disable the profiler - for _ in range(prof.enable_count): - prof.disable_by_count() - # Restore the state of the global `@line_profiler.profile` - if options.global_profiler: - options.install_profiler(None) if __name__ == '__main__': diff --git a/line_profiler/_child_process_profiling/__init__.py b/line_profiler/_child_process_profiling/__init__.py new file mode 100644 index 00000000..58e0806f --- /dev/null +++ b/line_profiler/_child_process_profiling/__init__.py @@ -0,0 +1,10 @@ +""" +Tooling for profiling child Python processes and gathering their +profiling results. + +Notes: + - THIS IS AN EXPERIMENTAL FEATURE. + + - All contents of this subpackage is to be considered implementation + details. +""" diff --git a/line_profiler/_child_process_profiling/_cache_logging.py b/line_profiler/_child_process_profiling/_cache_logging.py new file mode 100644 index 00000000..91b0635f --- /dev/null +++ b/line_profiler/_child_process_profiling/_cache_logging.py @@ -0,0 +1,338 @@ +""" +Logging utilities. +""" +from __future__ import annotations + +import os +import re +from collections.abc import Generator +from datetime import datetime +from itertools import pairwise +from pathlib import Path +from string import Formatter as StringParser +from textwrap import dedent +from typing import TYPE_CHECKING, NamedTuple, TextIO, overload +from typing_extensions import Self + +from .. import _diagnostics as diagnostics +from .misc_utils import block_indent + + +__all__ = ('CacheLoggingEntry',) + + +FILENAME_PATTERN = 'debug_log_{main_pid}_{current_pid}.log' +TIMESTAMP_PATTERN = '[cache-debug-log {timestamp} DEBUG]' +HEADER_PATTERN = 'PID {current_pid} ({main_pid}): Cache {obj_id:#x}' + +TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S' +TIMESTAMP_MICROSECOND_SEP = ',' +TIMESTAMP_MICROSECOND_PLACES = 3 +TIMESTAMP_SPACING = ' ' + +HEADER_SEP = ': ' +HEADER_MAIN_INDICATOR = 'main process' + + +def get_logger_header(current_pid: int, main_pid: int, obj_id: int) -> str: + """ + Returns: + msg_header (str): + Message header, to be prefixed to messages sent to + :py:data:`line_profiler._diagnostics.log`. + """ + return HEADER_PATTERN.format( + current_pid=current_pid, + main_pid=( + HEADER_MAIN_INDICATOR if main_pid == current_pid else main_pid + ), + obj_id=obj_id, + ) + + +def format_timestamp(ts: datetime) -> str: + """ + Replicate the :py:mod:`logging`'s default formatting for timestamps. + + Example: + >>> ts = datetime(2000, 1, 23, 4, 5, 6, 789000) + >>> as_str = format_timestamp(ts) + >>> print(as_str) + 2000-01-23 04:05:06,789 + >>> assert parse_timestamp(as_str) == ts + """ + return '{}{}{:0{}d}'.format( + ts.strftime(TIMESTAMP_FORMAT), + TIMESTAMP_MICROSECOND_SEP, + int(ts.microsecond / 1000), + TIMESTAMP_MICROSECOND_PLACES, + ) + + +def parse_timestamp(ts: str) -> datetime: + """ + Turn a formatted string timestamp back to a + :py:class:`datetime.datetime` object. + """ + assert TIMESTAMP_MICROSECOND_SEP in ts + base, _, fractional = ts.rpartition(TIMESTAMP_MICROSECOND_SEP) + # The microsecond field %f must be 6 digits long + if len(fractional) < 6: + fractional = f'{fractional:<06}' + else: + fractional = fractional[:6] + parse_format = f'{TIMESTAMP_FORMAT}{TIMESTAMP_MICROSECOND_SEP}%f' + ts = f'{base}{TIMESTAMP_MICROSECOND_SEP}{fractional}' + return datetime.strptime(ts, parse_format) + + +def add_timestamp(msg: str, timestamp: datetime | None = None) -> str: + """ + Returns: + msg_with_timestamp (str): + (Block-indented) message with timestamp, to be written to + the :py:attr:`LineProfilingCache._debug_log`. + """ + if timestamp is None: + timestamp = datetime.now() + ts_formatted = TIMESTAMP_PATTERN.format( + timestamp=format_timestamp(timestamp), + ) + return block_indent(msg, ts_formatted + TIMESTAMP_SPACING) + + +def parse_id(uint: str) -> int: + """ + Example: + >>> n = 123456 + >>> for formatter in str, bin, oct, hex: + ... assert parse_id(formatter(n)) == n + """ + for prefix, base in ('0b', 2), ('0o', 8), ('0x', 16): + if uint.startswith(prefix): + return int(uint[len(prefix):], base=base) + return int(uint) + + +@overload +def fmt_to_regex(fmt: str, /, *auto_numbered_fields: str) -> str: + ... + + +@overload +def fmt_to_regex(fmt: str, /, **named_fields: str) -> str: + ... + + +def fmt_to_regex( + fmt: str, /, *auto_numbered_fields: str, **named_fields: str +) -> str: + """ + Example: + >>> import re + + Simple case: + + >>> pattern = fmt_to_regex( + ... '{func}({args})', func=r'[_\\w][_\\w\\d]+', args='.*', + ... ) + >>> print(pattern) + (?P[_\\w][_\\w\\d]+)\\((?P.*)\\) + >>> regex = re.compile('^' + pattern, re.MULTILINE) + >>> assert not regex.search('0(1)') + >>> match = regex.search(' \\nint(-1.5)') + >>> assert match.group('func', 'args') == ('int', '-1.5') + + Repeated fields: + + >>> palindrome_5l = re.compile(fmt_to_regex( + ... '{first}{second}{third}{second}{first}', + ... first='.', second='.', third='.', + ... )) + >>> print(palindrome_5l.pattern) + (?P.)(?P.)(?P.)(?P=second)(?P=first) + >>> assert not palindrome_5l.match('abbbe') + >>> match = palindrome_5l.match('aBcBa') + >>> assert match.group('first', 'second', 'third') == ( + ... 'a', 'B', 'c', + ... ) + + Auto-numbered fields: + + >>> print(fmt_to_regex( + ... '[{} {}-{}-{} {}:{}:{},{} {}]', + ... # Logger name + ... '.+', + ... # Date + ... r'\\d\\d', r'\\d\\d', r'\\d\\d', + ... # Time + milliseconds + ... r'\\d\\d', r'\\d\\d', r'\\d\\d', r'\\d\\d\\d', + ... # Category + ... 'DEBUG|INFO|WARNING|ERROR|CRITICAL', + ... )) + \\[(.+)\\ (\\d\\d)\\-(\\d\\d)\\-(\\d\\d)\\ \ +(\\d\\d):(\\d\\d):(\\d\\d),(\\d\\d\\d)\\ \ +(DEBUG|INFO|WARNING|ERROR|CRITICAL)\\] + """ + chunks: list[str] = [] + seen_fields: set[str] = set() + for i, (prefix, field, *_) in enumerate(StringParser().parse(fmt)): + chunks.append(re.escape(prefix)) + if field is None: + break # Suffix -> we're done + if field: # Named fields + assert field.isidentifier() + if field in seen_fields: + chunks.append(f'(?P={field})') + else: + chunks.append(f'(?P<{field}>{named_fields[field]})') + seen_fields.add(field) + else: # Auto-numbered fields + chunks.append(f'({auto_numbered_fields[i]})') + return ''.join(chunks) + + +class CacheLoggingEntry(NamedTuple): + """ + Logging entry written to a log file by + :py:meth:`LineProfilingCache._debug_output`. + + Example: + >>> from datetime import datetime + >>> + >>> + >>> entry = CacheLoggingEntry( + ... datetime(1900, 1, 1, 0, 0, 0, 0), + ... 12345, + ... 12345, + ... 12345678, + ... 'This is a log message;\\nit has multiple lines', + ... ) + >>> print(entry.to_text()) + [cache-debug-log 1900-01-01 00:00:00,000 DEBUG] PID 12345 \ +(main process): Cache 0xbc614e: This is a log message; + it has \ +multiple lines + >>> another_entry = CacheLoggingEntry( + ... datetime(2000, 12, 31, 12, 34, 56, 789000), + ... 12345, + ... 54321, + ... 87654321, + ... 'FOO BAR BAZ', + ... ) + >>> print(another_entry.to_text()) + [cache-debug-log 2000-12-31 12:34:56,789 DEBUG] PID 54321 \ +(12345): Cache 0x5397fb1: FOO BAR BAZ + >>> log_text = '\\n'.join([ + ... e.to_text() for e in [entry, another_entry] + ... ]) + >>> assert CacheLoggingEntry.from_text(log_text) == [ + ... entry, another_entry, + ... ] + """ + timestamp: datetime + main_pid: int + current_pid: int + cache_id: int + msg: str + + def to_text(self) -> str: + return add_timestamp(self._get_header() + self.msg, self.timestamp) + + def _get_header(self) -> str: + return get_logger_header( + self.current_pid, self.main_pid, self.cache_id, + ) + HEADER_SEP + + def write(self, tee: os.PathLike[str] | str | None = None) -> None: + log_msg = self._get_header() + self.msg + diagnostics.log.debug(log_msg) + if tee is None: + return + with Path(tee).open(mode='a') as fobj: + print(add_timestamp(log_msg, self.timestamp), file=fobj) + + @classmethod + def new(cls, main_pid: int, cache_id: int, msg: str) -> Self: + return cls(datetime.now(), main_pid, os.getpid(), cache_id, msg) + + @classmethod + def from_file(cls, file: os.PathLike[str] | str | TextIO) -> list[Self]: + try: + path = Path(file) # type: ignore + except TypeError: # File object + # If we're here, `file` is a file object + if TYPE_CHECKING: + assert isinstance(file, TextIO) + content = file.read() + else: + content = path.read_text() + return cls.from_text(content) + + @classmethod + def from_text(cls, text: str) -> list[Self]: + def gen_timestamps(text: str) -> Generator[re.Match, None, None]: + last_ts_match: re.Match | None = None + while True: + ts_match = timestamp_regex.search( + text, last_ts_match.end() if last_ts_match else 0, + ) + if ts_match: + yield ts_match + last_ts_match = ts_match + else: + return + + def gen_message_blocks(text: str) -> Generator[ + tuple[datetime, re.Match, str], None, None + ]: + timestamps = list(gen_timestamps(text)) + if not timestamps: + return + + # Handle all the entries up till the 2nd-to-last one + for this_match, next_match in pairwise(timestamps): + ts = parse_timestamp(this_match.group('timestamp')) + # The -1 is for stripping the trailing newline + text_block = text[this_match.start():next_match.start() - 1] + yield (ts, this_match, text_block) + # Handle the last entry + last_match = timestamps[-1] + yield ( + parse_timestamp(last_match.group('timestamp')), + last_match, + text[last_match.start():], + ) + + def get_entries(text: str) -> Generator[Self, None, None]: + for timestamp, ts_match, text_block in gen_message_blocks(text): + # Strip the block indent + ts_text = ts_match.group(0) + assert text_block.startswith(ts_text) + ts_width = len(ts_text) + text_block = dedent(' ' * ts_width + text_block[ts_width:]) + # Strip the header and parse the relevant info from it + header_match = header_regex.match(text_block) + assert header_match + current_pid = int(header_match.group('current_pid')) + main_pid_ = header_match.group('main_pid') + if main_pid_ == HEADER_MAIN_INDICATOR: + main_pid = current_pid + else: + main_pid = int(main_pid_) + cache_id = parse_id(header_match.group('obj_id')) + # The rest of the block is the message proper + msg = text_block[header_match.end():] + yield cls(timestamp, main_pid, current_pid, cache_id, msg) + + timestamp_pattern = fmt_to_regex( + f'{TIMESTAMP_PATTERN}{TIMESTAMP_SPACING}', timestamp='.+?', + ) + timestamp_regex = re.compile('^' + timestamp_pattern, re.MULTILINE) + header_regex = re.compile(fmt_to_regex( + HEADER_PATTERN + HEADER_SEP, + current_pid=r'\d+', + main_pid=r'\d+|' + re.escape(HEADER_MAIN_INDICATOR), + obj_id='.+?', + )) + return list(get_entries(text)) diff --git a/line_profiler/_child_process_profiling/cache.py b/line_profiler/_child_process_profiling/cache.py new file mode 100644 index 00000000..51a464b3 --- /dev/null +++ b/line_profiler/_child_process_profiling/cache.py @@ -0,0 +1,857 @@ +""" +A cache object to be used by for propagating profiling down to child +processes. +""" +from __future__ import annotations + +import atexit +import dataclasses +import os +import sys +try: + import _pickle as pickle +except ImportError: + import pickle # type: ignore[assignment,no-redef] +from collections.abc import Collection, Callable, Iterable +from functools import partial, cached_property, wraps +from importlib import import_module +from operator import setitem +from pathlib import Path +from pickle import HIGHEST_PROTOCOL +from reprlib import Repr +from textwrap import indent +from types import MethodType, ModuleType +from typing import Any, ClassVar, TypeVar, TypedDict, cast, overload +from typing_extensions import Concatenate, ParamSpec, Self, Unpack + +from .. import _diagnostics as diagnostics +from ..curated_profiling import CuratedProfilerContext +from ..line_profiler import LineProfiler, LineStats +from ._cache_logging import CacheLoggingEntry +from .misc_utils import block_indent, make_tempfile +# Note: this should have been defined here in this file, but we moved it +# over to `~._child_process_hook` because that module contains the .pth +# hook, which must run with minimal overhead when a Python process isn't +# associated with a profiled process +from .pth_hook import INHERITED_PID_ENV_VARNAME + + +__all__ = ('LineProfilingCache',) + + +T = TypeVar('T') +PS = ParamSpec('PS') +# Note: `typing.AnyStr` deprecated since 3.13 +AnyStr = TypeVar('AnyStr', str, bytes) + +_THIS_SUBPACKAGE, *_ = (lambda: None).__module__.rpartition('.') +INHERITED_CACHE_ENV_VARNAME_PREFIX = ( + 'LINE_PROFILER_PROFILE_CHILD_PROCESSES_CACHE_DIR' +) +CACHE_FILENAME = 'line_profiler_cache.pkl' +_DEBUG_LOG_FILENAME_PATTERN = 'debug_log_{main_pid}_{current_pid}.log' + + +def _import_sibling(submodule: str) -> ModuleType: + return import_module(f'{_THIS_SUBPACKAGE}.{submodule}') + + +class _ReprAttributes(TypedDict, total=False): + """ + Note: + We use this typed dict instead of directly supplying them in the + :py:meth:`_CallbackRepr.__init__()` signature, because we don't + want to bother with the default values there. + """ + maxlevel: int + maxtuple: int + maxlist: int + maxarray: int + maxdict: int + maxset: int + maxfrozenset: int + maxdeque: int + maxstring: int + maxlog: int + maxother: int + fillvalue: str + indent: str | int | None + + +class _CallbackRepr(Repr): + """ + :py:class:`reprlib.Repr` subclass to help with representing cleanup + callbacks, special-casing certain relevant object types (see + examples below). + + Example: + >>> from functools import partial + >>> from sys import version_info + + >>> class MyEnviron(dict): + ... def some_method(self) -> None: + ... ... + ... + >>> + >>> class MyRepr(_CallbackRepr): + ... # Since we can't instantiate a new `os._Environ`, test + ... # the relevant method with a mock + ... repr_MyEnviron = _CallbackRepr.repr__Environ + ... + >>> + >>> r = MyRepr(maxenv=3, maxargs=4, maxstring=15) + + Environ-dict formatting: + + >>> my_env = MyEnviron( + ... foo='1', + ... bar='2', + ... this_varname_is_long_but_isnt_truncated=( + ... "THIS VALUE IS TRUNCATED BECAUSE IT'S TOO LONG" + ... ), + ... baz='4', + ... ) + >>> print(r.repr(my_env)) + environ({'foo': '1', 'bar': '2', \ +'this_varname_is_long_but_isnt_truncated': 'THIS ... LONG', ...}) + + Partial-object formatting: + + >>> r.maxenv = 0 + >>> print(r.repr(my_env.some_method)) + + + Bound-method formatting: + + >>> r.maxargs = 0 + >>> callback_1 = partial(int, base=8) + >>> print(r.repr(callback_1)) + functools.partial(, ...) + + Indentation (Python 3.12+): + + >>> if version_info < (3, 12): + ... from pytest import skip + ... + ... skip( + ... '`Repr.indent` not available on {}.{},{}' + ... .format(*sys.version_info) + ... ) + + >>> r = MyRepr(maxenv=2, maxargs=4) + >>> r.indent = 2 + >>> callback_1 = partial(int, base=8) + >>> print(r.repr(callback_1)) + functools.partial( + , + base=8, + ) + + >>> callback_2 = partial(min, 5, 4, 3, 2, 1) + >>> r.indent = '----' + >>> print(r.repr(callback_2)) + functools.partial( + ----, + ----5, + ----4, + ----3, + ----2, + ----..., + ) + + >>> r.indent = ' ' + >>> r.maxenv = 2 + >>> print(r.repr(my_env.some_method)) + + """ + def __init__( + self, + *, + maxargs: int = 5, + maxenv: int = 3, + **kwargs: Unpack[_ReprAttributes] + ) -> None: + super().__init__() # kwargs are 3.12+ + for k, v in kwargs.items(): + setattr(self, k, v) + self.maxargs = maxargs + self.maxenv = maxenv + + def repr__Environ(self, env: os._Environ[AnyStr], level: int) -> str: + get: Callable[[AnyStr], str] = partial(self.repr1, level=level-1) + # Truncate envvar values, but not their names + envvars = ['{!r}: {}'.format(k, get(v)) for k, v in env.items()] + return self._format_items(envvars, ('environ({', '})'), self.maxenv) + + def repr_method(self, method: MethodType, level: int) -> str: + instance = self.repr1(method.__self__, level-1) + func = getattr(method.__func__, '__qualname__', '?') + prefix, suffix = f'' + # Take care of possible multi-line reprs + return block_indent(instance, prefix) + suffix + + def repr_partial(self, ptl: partial, level: int) -> str: + get: Callable[[Any], str] = partial(self.repr1, level=level-1) + args = [get(arg) for arg in ptl.args] + args.extend('{}={}'.format(k, get(v)) for k, v in ptl.keywords.items()) + args.insert(0, get(ptl.func)) + name = '{0.__module__}.{0.__qualname__}'.format(type(ptl)) + # The +1 is to account for `ptl.func` + return self._format_items(args, (name + '(', ')'), self.maxargs + 1) + + def _format_items( + self, + items: Collection[str], + delims: tuple[str, str], + maxlen: int | None = None, + ) -> str: + start, end = delims + if maxlen is not None and len(items) > maxlen: + items = list(items)[:maxlen] + ['...'] + indent_prefix: str | None = self._get_indent() + if indent_prefix is None or not items: + return '{}{}{}'.format(start, ', '.join(items), end) + return '\n'.join([ + start, *(indent(item + ',', indent_prefix) for item in items), end, + ]) + + if sys.version_info >= (3, 12): + # Note: `.indent` only available since 3.12 + def _get_indent(self) -> str | None: + indent = self.indent + if indent is None or isinstance(indent, str): + return indent + return ' ' * indent + else: + @staticmethod + def _get_indent() -> None: + return None + + +_CALLBACK_REPR = _CallbackRepr(maxother=cast(int, float('inf'))).repr + + +@dataclasses.dataclass +class LineProfilingCache: + """ + Helper object for coordinating a line-profiling session, caching the + info required to make profiling persist into child processes. + """ + cache_dir: os.PathLike[str] | str + config: os.PathLike[str] | str | None = None + profiling_targets: Collection[str] = dataclasses.field( + default_factory=list, + ) + rewrite_module: os.PathLike[str] | str | None = None + profile_imports: bool = False + preimports_module: os.PathLike[str] | str | None = None + main_pid: int = dataclasses.field(default_factory=os.getpid) + # Note: if we're using the line profiler, `kernprof` always set + # `builtin` to true + insert_builtin: bool = True + debug: bool = diagnostics.DEBUG + + profiler: LineProfiler | None = dataclasses.field( + default=None, init=False, repr=False, + ) + _cleanup_stacks: dict[float, list[Callable[[], Any]]] = dataclasses.field( + default_factory=dict, init=False, repr=False, + ) + _loaded_instance: ClassVar[Self | None] = None + + def cleanup(self) -> None: + """ + Pop all the cleanup callbacks from the internal stack added via + :py:meth:`~.add_cleanup` and call them in order. + """ + stacks = self._cleanup_stacks + ncallbacks_total = sum(len(stack) for stack in stacks.values()) + if not ncallbacks_total: + self._debug_output('Cleanup aborted (no registered callbacks)') + return + # Bookend the cleanup loop with log messages to help detect if + # child processes are prematurely terminated + self._debug_output( + f'Starting cleanup ({ncallbacks_total} callback(s))...', + ) + ncallbacks_run = 0 + for priority in sorted(stacks): + callbacks = stacks.pop(priority) + while callbacks: + callback = callbacks.pop() + callback_repr = _CALLBACK_REPR(callback) + ncallbacks_run += 1 + try: + callback() + except Exception as e: + state = 'failed' + msg = f'{callback_repr}: {type(e).__name__}: {e}' + else: + state, msg = 'succeeded', f'{callback_repr}' + self._debug_output( + f'- Cleanup {state} ' + f'({ncallbacks_run}/{ncallbacks_total}): {msg}', + ) + self._debug_output( + f'... cleanup completed ({ncallbacks_total} callback(s))', + ) + + def add_cleanup( + self, callback: Callable[PS, Any], *args: PS.args, **kwargs: PS.kwargs, + ) -> None: + """ + Add a cleanup callback to the internal stack; they can be later + called by :py:meth:`~.cleanup`. + """ + self._add_cleanup(callback, 0, *args, **kwargs) + + def _add_cleanup( + self, callback: Callable[PS, Any], priority: float, + *args: PS.args, **kwargs: PS.kwargs, + ) -> None: + if args or kwargs: + callback = partial(callback, *args, **kwargs) + self._cleanup_stacks.setdefault(priority, []).append(callback) + header = 'Cleanup callback added' + if priority: + header = f'{header} (priority: {priority})' + self._debug_output(f'{header}: {_CALLBACK_REPR(callback)}') + + def copy( + self, *, + inherit_cleanups: bool = False, + inherit_profiler: bool = False, + **replacements + ) -> Self: + """ + Make a copy with optionally replaced fields. + + Args: + inherit_cleanups (bool): + If true, the copy also makes a (shallow) copy of the + cleanup-callback stack. + inherit_profiler (bool): + If true, the copy also gets a reference to + :py:attr:`~.profiler` + **replacements (Any): + Optional fields to replace + + Return: + inst (LineProfilingCache): + New instance + """ + init_args: dict[str, Any] = {} + for field, value in self._get_init_args().items(): + init_args[field] = replacements.get(field, value) + copy = type(self)(**init_args) + if inherit_cleanups: + copy._cleanup_stacks = { + priority: list(callbacks) + for priority, callbacks in self._cleanup_stacks.items() + } + if inherit_profiler: + copy.profiler = self.profiler + return copy + + @classmethod + def load(cls) -> Self: + """ + Reconstruct the instance from the environment variables + :env:`LINE_PROFILER_PROFILE_CHILD_PROCESSES_CACHE_PID` and + :env:`LINE_PROFILER_PROFILE_CHILD_PROCESSES_CACHE_DIR_`. + These should have been set from an ancestral Python process. + + Note: + If a previously :py:meth:`.~.load`-ed instance exists, it is + returned instead of a new instance. + """ + instance = cls._loaded_instance + if instance is None: + pid = os.environ[INHERITED_PID_ENV_VARNAME] + cache_varname = f'{INHERITED_CACHE_ENV_VARNAME_PREFIX}_{pid}' + cache_dir = os.environ[cache_varname] + msg = ( + f'PID {os.getpid()} (from {pid}): ' + f'Loading instance from ${{{cache_varname}}} = {cache_dir}' + ) + diagnostics.log.debug(msg) + instance = cls._from_path(cls._get_filename(cache_dir)) + instance._replace_loaded_instance(force=True) + return instance + + def dump(self) -> None: + """ + Serialize the cache instance and dump into the default location + as indicated by :py:attr:`~.cache_dir`, so that they can be + :py:meth:`~.load`-ed by child processes. + + Note: + Cleanup callbacks are not serialized. + """ + content = self._get_init_args() + msg = f'Dumping instance data to {self.filename}: {content!r}' + self._debug_output(msg) + with open(self.filename, mode='wb') as fobj: + pickle.dump(content, fobj, protocol=HIGHEST_PROTOCOL) + + def gather_stats(self, glob_pattern: str = '*.lprof') -> LineStats: + """ + Gather the profiling output files matching ``glob_pattern`` from + :py:attr:`~.cache_dir`, consolidating them into a single + :py:class:`LineStats` object. + """ + fnames = list(Path(self.cache_dir).glob(glob_pattern)) + self._debug_output( + 'Loading results from {} child profiling file(s): {!r}' + .format(len(fnames), fnames) + ) + if not fnames: + return LineStats.get_empty_instance() + return LineStats.from_files(*fnames, on_defective='ignore') + + def _dump_debug_logs(self) -> None: + """ + Gather the debug logfiles in child processes and write their + contents to the logger + (:py:data:`line_profiler._diagnostics.log`). + + Notes: + - The content of each child-process log file is not + re-parsed and is written to the logger as a single + multi-line message. + + - To be called in the main process. + """ + for log in sorted(self._get_debug_logfiles()): + if log == self._debug_log: # Don't double dip + continue + *_, child_pid = log.stem.rpartition('_') + msg = 'Cache log messages from child process {}:\n{}'.format( + child_pid, indent(log.read_text(), ' '), + ) + diagnostics.log.debug(msg) + + def _gather_debug_log_entries( + self, chronological: bool = False, + ) -> list[CacheLoggingEntry]: + """ + Gather and return all entries from debug logfiles sorted by + timestamps. + """ + log_files: Iterable[Path] = self._get_debug_logfiles() + if chronological: # Sorting on the entries -> chronological + to_list: Callable[ + [Iterable[CacheLoggingEntry]], list[CacheLoggingEntry] + ] = sorted + else: + # Otherwise, just sort by filename (entries in each file are + # still chronological) + log_files = sorted(log_files) + to_list = list + return to_list( + entry for log in log_files + for entry in CacheLoggingEntry.from_file(log) + ) + + def _get_debug_logfiles(self) -> Iterable[Path]: + pattern = _DEBUG_LOG_FILENAME_PATTERN.format( + main_pid=self.main_pid, current_pid='*', + ) + return Path(self.cache_dir).glob(pattern) + + def inject_env_vars( + self, env: dict[str, str] | None = None, + ) -> None: + """ + Inject the :py:attr:`~.environ` variables into ``env`` and add + cleanup callbacks to reverse them. + + Args: + env (dict[str, str] | None): + Dictionary in the format of :py:data:`os.environ`; + default is to use that + """ + if env is None: + env = cast(dict[str, str], os.environ) + for name, value in self.environ.items(): + try: + old = env[name] + except KeyError: + self.add_cleanup(env.pop, name, None) + change = f'{value!r} (new)' + else: + self.add_cleanup(setitem, env, name, old) + change = f'{old!r} -> {value!r}' + self._debug_output(f'Injecting env var ${{{name}}}: {change}') + env[name] = value + + def _debug_output(self, msg: str) -> None: + try: + self._make_debug_entry(msg).write(self._debug_log) + except OSError: # Cache dir may have been rm-ed during cleanup + pass + + def _setup_in_main_process(self, wrap_os_fork: bool = True) -> None: + """ + Set up shop in the main process so that (line-)profiling can + extend into child processes. + + Args: + wrap_os_fork (bool): + Whether to wrap :py:func:`os.fork` which handles + profiling + + Side effects: + + - Instance data written to :py:attr:`~.cache_dir` + + - Environment variables injected + (see :py:meth:`~.inject_env_vars()`) + + - A ``.pth`` file written so that child processes + automaticaly runs setup code (see + :py:func:`line_profiler._child_process_hook.pth_hook.\ +write_pth_hook`) + + - :py:func:`os.fork` wrapped so that profiling set up in + forked processes is properly handled (if + ``wrap_os_fork=True``) + + - :py:mod:`multiprocessing` and :py:mod:`threading` patched + so that child processes and threads managed thereby are + properly handled + + - Instance to be returned if :py:func:`~.load()` is called + from now on + """ + self.dump() + self.inject_env_vars() + _import_sibling('pth_hook').write_pth_hook(self) + self._setup_common(wrap_os_fork, reboot_forkserver=True) + self._replace_loaded_instance() + + def _setup_in_child_process( + self, + wrap_os_fork: bool = False, + context: str = '', + prof: LineProfiler | None = None, + ) -> bool: + """ + Set up shop in a forked/spawned child process so that + (line-)profiling can extend therein. + + Args: + wrap_os_fork (bool): + Whether to wrap :py:func:`os.fork` which handles + profiling; already-forked child processes should set + this to false + context (str): + Optional context from which the function is called, to + be used in log messages + prof (LineProfiler | None): + Optional profiler instance to associate with the cache; + if not provided, an instance is created + + Returns: + has_set_up (bool): + False the instance has already been set up prior to + calling this function, true otherwise + """ + if not context: + context = '...' + self._debug_output(f'Setting up ({context})...') + if self.profiler is not None: # Already set up + self._debug_output(f'Setup aborted ({context})') + return False + + # Create a profiler instance and manage it with + # `CuratedProfilerContext` + if prof is None: + prof = LineProfiler() + self.profiler = prof + ctx = CuratedProfilerContext(prof, insert_builtin=self.insert_builtin) + ctx.install() + self.add_cleanup(ctx.uninstall) + self._debug_output(f'Set up `.profiler` at {id(prof):#x}') + + # Do the preimports at `cache.preimports_module` where + # appropriate + if self.preimports_module: + self._debug_output('Loading preimports...') + with open(self.preimports_module, mode='rb') as fobj: + code = compile(fobj.read(), self.preimports_module, 'exec') + exec(code, {}) # Use a fresh, empty namespace + + # Occupy a tempfile slot in `.cache_dir` and set the profiler + # up to write thereto when the process terminates (with high + # priority) + prof_outfile = self.make_tempfile( + prefix='child-prof-output-{}-{}-{:#x}-' + .format(self.main_pid, os.getpid(), id(prof)), + suffix='.lprof', + ) + self._add_cleanup(prof.dump_stats, -1, prof_outfile) + + # Various setups + self._setup_common(wrap_os_fork, reboot_forkserver=False) + + # Set `.cleanup()` as an atexit hook to handle everything when + # the child process is about to terminate + atexit.register(self.cleanup) + + self._debug_output(f'Setup successful ({context})') + return True + + def _setup_common( + self, wrap_os_fork: bool, reboot_forkserver: bool, + ) -> None: + if wrap_os_fork: + self._wrap_os_fork() + _import_sibling('multiprocessing_patches').apply( + self, reboot_forkserver, + ) + _import_sibling('threading_patches').apply(self) + + def _wrap_os_fork(self) -> None: + """ + Create a wrapper around :py:func:`os.fork` which handles + profiling. + + Side effects: + - :py:func:`os.fork` (if available) replaced with the + wrapper + - :py:meth:`~.cleanup` callback registered undoing that + """ + try: + fork = os.fork + except AttributeError: # Can't fork on this platform + return + + @wraps(fork) + def wrapper() -> int: + ppid = os.getpid() + result = fork() + if result: + return result + # If we're here, we are in the fork + pid = os.getpid() + forked = self.copy() # Ditch inherited cleanups + forked._debug_output(f'Forked: {ppid} -> {pid}') + if forked._replace_loaded_instance(): + forked._debug_output( + 'Superseded cached `.load()`-ed instance in forked process' + ) + # Note: we can reuse the profiler instance in the fork, but + # it needs to go through setup so that the separate + # profiling results are dumped into another output file + forked._setup_in_child_process(False, 'fork', self.profiler) + return result + + self.patch(os, 'fork', wrapper, name='os') + + def patch( + self, obj: Any, attr: str, value: Any, *, + name: str | None = None, cleanup: bool = True, + ) -> None: + """ + Patch ``attr`` on ``obj`` with ``value``. If ``cleanup`` is + true, register a cleanup callback to either reset or delete the + attribute. + """ + add_cleanup = self.add_cleanup if cleanup else (lambda *_, **__: None) + try: + old = getattr(obj, attr) + except AttributeError: + add_cleanup(delattr, obj, attr) + else: + add_cleanup(setattr, obj, attr, old) + setattr(obj, attr, value) + if name is None: + name = repr(obj) + msg = 'Patched `{}.{}` -> `{}`'.format(name, attr, value) + self._debug_output(msg) + + def make_tempfile(self, **kwargs) -> Path: + """ + Create a fresh tempfile under :py:attr:`~.cache_dir`. The other + arguments are passed as-is to :py:func:`tempfile.mkstemp`. + + Returns: + path (Path): + Path to the created file. + """ + path = make_tempfile(dir=self.cache_dir, **kwargs) + self._debug_output(f'Created tempfile: {path.name!r}') + return path + + def _replace_loaded_instance(self, force: bool = False) -> bool: + cls = type(self) + if force or self._consistent_with_loaded_instance: + # Note: `ty` REALLY hates assigning an instance to + # `ClassVar[Self]` (#3274); no choice but to ignore it for + # the time being... + cls._loaded_instance = self # type: ignore + return True + return False + + @classmethod + def _from_path(cls, fname: os.PathLike[str] | str) -> Self: + with open(fname, mode='rb') as fobj: + return cls(**pickle.load(fobj)) + + def _get_init_args(self) -> dict[str, Any]: + init_fields = [ + field_obj.name for field_obj in dataclasses.fields(self) + if field_obj.init + ] + return {name: getattr(self, name) for name in init_fields} + + @staticmethod + def _get_filename(cache_dir: os.PathLike[str] | str) -> str: + return os.path.join(cache_dir, CACHE_FILENAME) + + @overload + @classmethod + def _method_wrapper( + cls, + wrapper: Callable[Concatenate[Self, Callable[PS, T], PS], T], + debug: bool | None = None, + ) -> Callable[[Callable[PS, T]], Callable[PS, T]]: + ... + + @overload + @classmethod + def _method_wrapper( + cls, wrapper: None = None, debug: bool | None = None, + ) -> Callable[ + [Callable[Concatenate[Self, Callable[PS, T], PS], T]], + Callable[[Callable[PS, T]], Callable[PS, T]] + ]: + ... + + @classmethod + def _method_wrapper( + cls, + wrapper: ( + Callable[Concatenate[Self, Callable[PS, T], PS], T] | None + ) = None, + debug: bool | None = None, + ) -> ( + Callable[ + [Callable[Concatenate[Self, Callable[PS, T], PS], T]], + Callable[[Callable[PS, T]], Callable[PS, T]] + ] + | Callable[[Callable[PS, T]], Callable[PS, T]] + ): + """ + Convenience wrapper decorator for functions which use the + :py:meth:`load`-ed session instance and wrap another callable. + + Args: + wrapper (Callable[..., T]) + Callable with the call signature + ``(cache, vanilla_impl, *args, **kwargs) -> retval``; + ``*args``, ``**kwargs``, and ``retval`` should be + consistent with that of ``vanilla_impl()``'s. + debug (bool | None) + Whether to format and write debug messages before and + after the call to the ``wrapper`` callable; + if ``debug`` is not set, it will be taken from the + session instance. + + Returns: + inner_wrapper (Callable[[Callable[PS, T]], Callable[PS, T]]) + Wrapper(-maker) which takes the ``vanilla_impl`` and + return a wrapper around it. + """ + if wrapper is None: + # `ty` doesn't quite support `partial` yet, see issue #1536 + return cast( + Callable[[Callable[PS, T]], Callable[PS, T]], + partial(cls._method_wrapper, debug=debug), + ) + + def inner_wrapper(vanilla_impl: Callable[PS, T]) -> Callable[PS, T]: + @wraps(vanilla_impl) + def wrapped_impl(*args: PS.args, **kwargs: PS.kwargs) -> T: + cache = cls.load() + write = cache._debug_output + debug_: bool | None = debug + if debug_ is None: + debug_ = cache.debug + + if debug_: + arg_reprs: list[str] = [repr(arg) for arg in args] + arg_reprs.extend(f'{k}={v!r}' for k, v in kwargs.items()) + formatted_call = f'{name}({", ".join(arg_reprs)})' + write(f'Wrapped call made: {formatted_call}...') + try: + result = wrapper(cache, vanilla_impl, *args, **kwargs) + except Exception as e: + if debug_: + write( + 'Wrapped call failed: ' + f'{formatted_call} -> {type(e).__name__}: {e}', + ) + raise + else: + if debug_: + write( + 'Wrapped call succeeded: ' + f'{formatted_call} -> {result!r}', + ) + return result + + if ( + hasattr(vanilla_impl, '__module__') + and hasattr(vanilla_impl, '__qualname__') + ): + name = '{0.__module__}.{0.__qualname__}'.format(vanilla_impl) + else: + name = f'' + + return wrapped_impl + + for field in 'name', 'qualname', 'doc': + dunder = f'__{field}__' + value = getattr(wrapper, dunder, None) + if value is not None: + setattr(inner_wrapper, dunder, value) + return inner_wrapper + + @property + def environ(self) -> dict[str, str]: + """ + Environment variables to be injected into and inherited by child + processes. + """ + cache_varname = f'{INHERITED_CACHE_ENV_VARNAME_PREFIX}_{self.main_pid}' + return { + INHERITED_PID_ENV_VARNAME: str(self.main_pid), + cache_varname: str(self.cache_dir), + } + + @property + def filename(self) -> str: + return self._get_filename(self.cache_dir) + + @property + def _debug_log(self) -> Path | None: + if not self.debug: + return None + fname = _DEBUG_LOG_FILENAME_PATTERN.format( + main_pid=self.main_pid, current_pid=os.getpid(), + ) + return Path(self.cache_dir) / fname + + @cached_property + def _make_debug_entry(self) -> Callable[[str], CacheLoggingEntry]: + return partial(CacheLoggingEntry.new, self.main_pid, id(self)) + + @cached_property + def _consistent_with_loaded_instance(self) -> bool: + return type(self).load()._get_init_args() == self._get_init_args() diff --git a/line_profiler/_child_process_profiling/misc_utils.py b/line_profiler/_child_process_profiling/misc_utils.py new file mode 100644 index 00000000..050380bf --- /dev/null +++ b/line_profiler/_child_process_profiling/misc_utils.py @@ -0,0 +1,40 @@ +""" +Misc. utility functions used by the subpackage. +""" +import os +from pathlib import Path +from tempfile import mkstemp +from textwrap import indent + + +__all__ = ('block_indent', 'make_tempfile') + + +def block_indent(string: str, prefix: str, fill_char: str = ' ') -> str: + r""" + Example: + >>> string = 'foo\nbar\nbaz' + >>> print(string) + foo + bar + baz + >>> print(block_indent(string, '++++', '-')) + ++++foo + ----bar + ----baz + """ + width = len(prefix) + return prefix + indent(string, fill_char * width)[width:] + + +def make_tempfile(**kwargs) -> Path: + """ + Convenience wrapper around :py:func:`tempfile.mkstemp`, discarding + and closing the integer handle (which if left unattended causes + problems on some platforms). + """ + handle, fname = mkstemp(**kwargs) + try: + return Path(fname) + finally: + os.close(handle) diff --git a/line_profiler/_child_process_profiling/multiprocessing_patches.py b/line_profiler/_child_process_profiling/multiprocessing_patches.py new file mode 100644 index 00000000..c5561fa4 --- /dev/null +++ b/line_profiler/_child_process_profiling/multiprocessing_patches.py @@ -0,0 +1,477 @@ +""" +Patch :py:mod:`multiprocessing` so that profiling extends into processes +it creates. + +Notes +----- +- Based on the implementations in :py:mod:`coverage.multiproc` and + :py:mod:`pytest_autoprofile._multiprocessing`. +- Results may vary if the process pool is not properly + :py:meth:`multiprocessing.pool.Pool.close`-d and + :py:meth:`multiprocessing.pool.Pool.join`-ed; + see `this caveat `__. +""" +from __future__ import annotations + +import multiprocessing +import warnings +from collections.abc import Callable, Mapping +from functools import lru_cache, partial +from importlib import import_module +from multiprocessing.process import BaseProcess +from os import PathLike +from time import sleep, monotonic +from types import MappingProxyType +from typing import ( + Any, Generic, Literal, Protocol, TypeVar, Union, NoReturn, cast, +) +from typing_extensions import Concatenate, ParamSpec, Self + +from .. import _diagnostics as diagnostics +from ..toml_config import ConfigSource +from .cache import LineProfilingCache +from .runpy_patches import create_runpy_wrapper + + +__all__ = ('apply',) + + +T = TypeVar('T') +PS = ParamSpec('PS') +_OnTimeout = Literal['ignore', 'warn', 'error'] + +_PATCHED_MARKER = '__line_profiler_patched_multiprocessing__' + + +class _Wrapper(Protocol, Generic[PS, T]): + def __call__(self, func: Callable[PS, T], /) -> Callable[PS, T]: + ... + + +class _Poller: + """ + Poll a callable until it returns true-y. + + Example: + >>> from itertools import count + >>> from typing import Iterator + >>> + >>> + >>> def count_until(limit: int) -> bool: + ... def counter_is_big_enough( + ... counter: Iterator[int], limit: int, + ... ) -> bool: + ... return next(counter) >= limit + ... + ... return _Poller.poll_until( + ... counter_is_big_enough, count(), limit, + ... ) + ... + >>> + >>> with count_until(10).with_cooldown(.01).with_timeout(.25): + ... print('We counted up to 10') + We counted up to 10 + >>> with count_until(30).with_cooldown(.01).with_timeout(.25): + ... print('We counted up to 30') \ +# doctest: +NORMALIZE_WHITESPACE + Traceback (most recent call last): + ... + line_profiler..._Poller.Timeout: ... + timed out (... s >= 0.25 s) waiting for + callback ...counter_is_big_enough... to return true + """ + def __init__( + self, + func: Callable[[], Any], + cooldown: float = 0, + timeout: float = 0, + on_timeout: _OnTimeout = 'error', + ) -> None: + if cooldown < 0: + cooldown = 0 + if timeout < 0: + timeout = 0 + self._func: Callable[[], Any] = func + self._cooldown = cooldown + self._timeout = timeout + self._on_timeout = on_timeout + + def sleep(self): + cd = self._cooldown + if cd > 0: + sleep(cd) + + def with_cooldown(self, cooldown: float) -> Self: + return type(self)( + self._func, cooldown, self._timeout, self._on_timeout, + ) + + def with_timeout( + self, + timeout: float | None = None, + on_timeout: _OnTimeout | None = None, + ) -> Self: + if timeout is None: + timeout = self._timeout + if on_timeout is None: + on_timeout = self._on_timeout + return type(self)(self._func, self._cooldown, timeout, on_timeout) + + @classmethod + def poll_until( + cls, func: Callable[PS, Any], /, *args: PS.args, **kwargs: PS.kwargs + ) -> Self: + if args or kwargs: + func = partial(func, *args, **kwargs) + return cls(func) + + @classmethod + def poll_while( + cls, func: Callable[PS, Any], /, *args: PS.args, **kwargs: PS.kwargs + ) -> Self: + def negated( + func: Callable[PS, Any], *a: PS.args, **k: PS.kwargs + ) -> bool: + return not func(*a, **k) + + return cls(partial(negated, func, *args, **kwargs)) + + def __enter__(self) -> Self: + def error(msg: str) -> NoReturn: + raise type(self).Timeout(msg) + + def warn(msg: str) -> None: + warnings.warn(msg) + diagnostics.log.warning(msg) + + def ignore(_): + pass + + timeout = self._timeout + callback = self._func + + handle_timeout: Callable[[str], Any] = { + 'error': error, 'warn': warn, 'ignore': ignore, + }[self._on_timeout] + fmt = '.3g' + timeout_msg_header = f'{type(self).__name__} at {id(self):#x}' + + start = monotonic() + while not callback(): + elapsed = monotonic() - start + if timeout and elapsed >= timeout: + handle_timeout( + f'{timeout_msg_header}: ' + f'timed out ({elapsed:{fmt}} s >= {timeout:{fmt}} s) ' + f'waiting for callback {callback!r} to return true' + ) + break + self.sleep() + return self + + def __exit__(self, *_, **__) -> None: + pass + + class Timeout(RuntimeError): + """ + Raised when a :py:class:`_Poller` is timed out when polling. + """ + pass + + +def _get_config( + config: PathLike[str] | str | bool | None = None, +) -> Mapping[str, Any]: + if config not in (True, False, None): + config = str(config) + return _get_config_cached(cast(Union[str, bool, None], config)) + + +@lru_cache() +def _get_config_cached( + config: PathLike[str] | str | bool | None = None, +) -> Mapping[str, Any]: + cd = dict( + ConfigSource.from_config(config) + .get_subconfig('multiprocessing', copy=True) + .conf_dict + ) + assert isinstance(cd.get('polling'), Mapping) + return MappingProxyType({**cd, 'polling': MappingProxyType(cd['polling'])}) + + +@LineProfilingCache._method_wrapper +def wrap_terminate( + cache: LineProfilingCache, + vanilla_impl: Callable[[BaseProcess], None], + self: BaseProcess, +) -> None: + """ + Wrap around :py:meth:`BaseProcess.terminate` to make sure that we + don't actually kill the child (OS-level) process before it has the + chance to properly clean up. + + Note: + We're technically polling in a loop, but it isn't actually + *that* bad: typically ``.terminate()`` is only called when we're + on the bad path (e.g. the parallel workload errored out), and + after the performance-critical part of the code (said workload). + """ + # XXX: why can `coverage` get away with not doing all these + # lock-file hijinks and just patching `BaseProcess._bootstrap()`? + def get_poller_args( + config: PathLike[str] | str | bool | None = None, + ) -> tuple[float, float, str | None]: + values = _get_config(config)['polling'] + try: + cooldown = max(float(values['cooldown']), 0) + except (TypeError, ValueError): + cooldown = 0 + try: + timeout = max(float(values['timeout']), 0) + except (TypeError, ValueError): + timeout = 0 + try: + on_timeout: str | None = values['on_timeout'].lower() + except Exception: # Fallback (use `_Poller`'s default) + on_timeout = None + return cooldown, timeout, on_timeout + + def process_has_returned(proc: BaseProcess, timeout: float) -> bool: + popen = getattr(proc, '_popen', None) + if popen is None: + msg, result = 'No associated process', True + else: + result = popen.wait(timeout) is not None + if result: + msg = f'Process {popen.pid} has returned' + else: + msg = f'Waiting for process {popen.pid} to return...' + cache._debug_output(f' {type(proc).__name__} @ {id(proc):#x}: {msg}') + return result + + def wait_for_return( + config: PathLike[str] | str | None = None, + ) -> _Poller: + cooldown, timeout, on_timeout = get_poller_args(config) + # `False` -> no resolution, force loading the vanilla file + *_, default_on_timeout = get_poller_args(False) + if on_timeout not in ('ignore', 'warn', 'error'): + on_timeout = default_on_timeout + return ( + _Poller.poll_until(process_has_returned, self, cooldown) + .with_timeout(timeout, cast(_OnTimeout, on_timeout)) + ) + + try: + with wait_for_return(cache.config): + pass + finally: # Always call `Process.terminate()` to avoid orphans + vanilla_impl(self) + + +@LineProfilingCache._method_wrapper +def wrap_bootstrap( + cache: LineProfilingCache, + vanilla_impl: Callable[Concatenate[BaseProcess, PS], T], + self: BaseProcess, + /, + *args: PS.args, **kwargs: PS.kwargs +) -> T: + """ + Wrap around :py:meth:`BaseProcess._bootstrap` to run + ``LineProfilingCache.load().cleanup()`` so that profiling results + can be gathered. + """ + try: + return vanilla_impl(self, *args, **kwargs) + finally: + msg = 'Calling cleanup hook via `BaseProcess._bootstrap`' + cache._debug_output(msg) + cache.cleanup() + + +def _cache_hook( + vanilla_impl: Callable[PS, T], + get_logging_message: Callable[PS, str], + /, + *args: PS.args, + **kwargs: PS.kwargs +) -> T: + msg = get_logging_message(*args, **kwargs) + LineProfilingCache.load()._debug_output(msg) + return vanilla_impl(*args, **kwargs) + + +def tee_log( + marker: str, + vanilla_impl: Callable[Concatenate[str, PS], None], + /, + msg: str, + *args: PS.args, + **kwargs: PS.kwargs +) -> None: + """ + Wrap around logging functions like + :py:func:`multiprocessing.util.debug` so that we can tee log + messages from the package to our own logs. + """ + def get_msg(msg: str, *_, **__) -> str: + return f'`multiprocessing` logging ({marker}): {msg}' + + _cache_hook( + vanilla_impl, get_msg, # type: ignore[arg-type] + msg, *args, **kwargs, + ) + + +def apply( + lp_cache: LineProfilingCache, reboot_forkserver: bool = True, +) -> None: + """ + Set up profiling in :py:mod:`multiprocessing` child processes by + applying patches to the module. + + Args: + lp_cache (LineProfilingCache): + Cache instance governing the profiling run + reboot_forkserver (bool): + Whether to reboot the global + :py:class`multiprocessing.forkserver.ForkServer` instance + so as to ensure that profiling happens on processes forked + therefrom (see Note) + + Side effects: + - :py:mod:`multiprocessing` marked as having been set up + + - The following methods and functions patched: + + - :py:meth:`multiprocessing.process.BaseProcess.terminate` + + - :py:meth:`multiprocessing.process.BaseProcess._bootstrap` + + - If ``reboot_forkserver=True``, fork-server process rebooted: + + - Immediately + + - When ``lp_cache.cleanup()`` is run + + - Cleanup callbacks registered via ``lp_cache.add_cleanup()`` + + Note: + Rebooting the fork server is necessary because its process + staticly inherits the environment when it is first spun up + (see :py:func:`multiprocessing.forkserver.ensure_running`). + Thus, without the reboots: + + - If in the same Python process we ever start up two separate + profliing sessions managed by different caches, the child + processes forked from the server will fail to inherit the + updated environment variables injected by the newer cache + instance, leading to the setup code in this subpackage not + being loaded. + + - Since 3.13.8 and 3.14.1, the bug where the ``main_path`` + argument to :py:func:`multiprocessing.forkserver.main` is + unused has been fixed (see ``cpython`` issue `GH-126631`_). + This causes ``sys.modules['__main__']`` to be set up in the + fork-server process, meaning that children forked therefrom + will NOT redo the setup. Thus, the fork-server process itself + will also need to be properly set up for profiling. + + .. _GH-126631: https://github.com/python/cpython/issues/126631 + """ + if not getattr(multiprocessing, _PATCHED_MARKER, False): + _apply_mp_patches(lp_cache, reboot_forkserver) + + +def _apply_patches_generic( + lp_cache: LineProfilingCache, + submodule: str, + targets: Mapping[str, Mapping[str, Callable[[Any], Any]]], + cleanup: bool = True, +) -> None: + submod_name = 'multiprocessing.' + submodule + try: + mod = import_module(submod_name) + except ImportError: + return + for target, patches in targets.items(): + if target: + try: + obj: Any = getattr(mod, target) + except AttributeError: + continue + name = f'{submod_name}.{target}' + else: + obj, name = mod, submod_name + replace = partial(lp_cache.patch, obj, cleanup=cleanup, name=name) + for method, method_wrapper in patches.items(): + try: + vanilla = getattr(obj, method) + except AttributeError: + continue + replace(method, method_wrapper(vanilla)) + + +def _apply_mp_patches( + lp_cache: LineProfilingCache, + reboot_forkserver: bool = True, + debug: bool | None = None, +) -> None: + # In a child process, we don't care about polluting the + # `multiprocessing` namespace, so don't bother with cleanup + apply_patches = partial(_apply_patches_generic, lp_cache) + # Patch `multiprocessing.process.BaseProcess` methods + # Note: the type checkers seem to need some help figuring the + # `patches` out... so do explicit `cast()`s + apply_patches( + 'process', + {'BaseProcess': {'terminate': wrap_terminate, + '_bootstrap': wrap_bootstrap}}, + ) + # Patch `multiprocessing.spawn` + try: + from multiprocessing import spawn + except ImportError: + pass + else: + if hasattr(spawn, 'runpy'): + lp_cache.patch( + spawn, 'runpy', create_runpy_wrapper(lp_cache), + name='multiprocessing.spawn', + ) + # Intercept `multiprocessing` debug messages + if debug is None: + debug = _get_config(lp_cache.config)['intercept_logs'] + if debug: + lfuncs = ['sub_debug', 'debug', 'info', 'sub_warning', 'warn'] + lpatches = {func: partial(partial, tee_log, func) for func in lfuncs} + apply_patches('util', {'': lpatches}) + # Stop the current `ForkServer` server process: + # - Now, so that the (rebooted) fork-server process has profiling + # set up; and + # - Also as a part of cache cleanup + # (this uses `ForkServer._stop()` which is private API, but it's the + # same hack used in Python's own test suite -- see the comment to + # said method) + if reboot_forkserver: + try: + from multiprocessing import forkserver + except ImportError: # Incompatible platform + pass + else: + server_instance: forkserver.ForkServer = forkserver._forkserver + stop = getattr(server_instance, '_stop', None) + assert callable(stop) # Appease the type checker + stop() + lp_cache.add_cleanup(stop) + # Mark `multiprocessing` as having been patched + lp_cache.patch( + multiprocessing, _PATCHED_MARKER, True, name='multiprocessing', + ) + + +def _no_op(*_, **__) -> None: + pass diff --git a/line_profiler/_child_process_profiling/pth_hook.py b/line_profiler/_child_process_profiling/pth_hook.py new file mode 100644 index 00000000..5a19cbce --- /dev/null +++ b/line_profiler/_child_process_profiling/pth_hook.py @@ -0,0 +1,115 @@ +""" +Hooks to set up shop in a child Python process and extend profiling +to therein. + +Note: + - The current implementation writes temporary .pth files to the + site-packages directory, which are executed for all Python + processes referring to the same :path:`lib/`. However, only + processes originating from a parent which set the requisite + environment variables will execute to the profiling code. + - Said .pth file always import this module; hence, this file is kept + intentionally lean to reduce overhead: + - Imports in this file are deferred to being as late as possible. + - Type annotations are replaced with type comments. + - Non-essential functionalities are split into small separate + submodules (e.g. :py:mod:`~.cache`). + - Inspired by similar code in :py:mod:`coverage.control` and + :py:mod:`pytest_autoprofile.startup_hook`. +""" +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pathlib import Path # noqa: F401 + from .cache import LineProfilingCache # noqa: F401 + + +__all__ = ('write_pth_hook', 'load_pth_hook') + +INHERITED_PID_ENV_VARNAME = ( + 'LINE_PROFILER_PROFILE_CHILD_PROCESSES_CACHE_PID' +) + + +def write_pth_hook(cache): # type: (LineProfilingCache) -> Path + """ + Write a .pth file which allows for setting up profiling in child + Python processes. + + Args: + cache (:py:class:`~.LineProfilingCache`): + Cache object + + Returns: + fpath (Path): + Path to the written .pth file + + Note: + - To be called in the main process. + - The ``cache`` is responsible for deleting the written .pth + file via the registered cleanup callback. + """ + import os + from sysconfig import get_path + from .misc_utils import make_tempfile + + if not os.path.exists(cache.filename): + cache.dump() + assert os.path.exists(cache.filename) + + fpath = make_tempfile( + prefix='_line_profiler_profiling_hook_', + suffix='.pth', + dir=get_path('purelib'), + ) + try: + pth_content = 'import {0}; {0}.load_pth_hook({1})'.format( + (lambda: None).__module__, cache.main_pid, + ) + fpath.write_text(pth_content) + cache.add_cleanup(fpath.unlink, missing_ok=True) + except Exception: + fpath.unlink(missing_ok=True) + raise + + return fpath + + +def load_pth_hook(ppid): # type: (int) -> None + """ + Function imported and called by the written .pth file; to reduce + overhead, we immediately return if ``ppid`` doesn't match + :env:`LINE_PROFILER_PROFILE_CHILD_PROCESSES_CACHE_PID`. + """ + from os import environ + + try: + env_ppid = int(environ[INHERITED_PID_ENV_VARNAME]) + except (KeyError, ValueError): + return + if env_ppid != ppid: + return + + # If we're here, we're most probably in a descendent process of a + # profiled Python process, so we can be more liberal with the + # imports without worrying about overhead + import warnings + from .._diagnostics import DEBUG, log + from .cache import LineProfilingCache # noqa: F811 + + # Note: .pth files may be double-loaded in a virtual environment + # (see https://stackoverflow.com/questions/58807569), so work around + # that; + # also see similar check in `coverage.control.process_startup()` + if getattr(load_pth_hook, 'called', False): + return + try: + cache = LineProfilingCache.load() + cache._setup_in_child_process(True, 'pth') + except Exception as e: + if DEBUG: + msg = f'{type(e)}: {e}' + warnings.warn(msg) + log.warning(msg) + finally: + load_pth_hook.called = True # type: ignore diff --git a/line_profiler/_child_process_profiling/runpy_patches.py b/line_profiler/_child_process_profiling/runpy_patches.py new file mode 100644 index 00000000..659f4de8 --- /dev/null +++ b/line_profiler/_child_process_profiling/runpy_patches.py @@ -0,0 +1,147 @@ +""" +Patches for :py:mod:`runpy` to be patched into the namespace of +:py:mod:`multiprocessing.spawn`, so that the rewriting of ``__main__`` +can be continued into child processes. +""" +from __future__ import annotations + +import os +from collections.abc import Callable +from functools import partial +from importlib.util import find_spec +from types import ModuleType +from typing import cast, TypeVar +from typing_extensions import Concatenate, ParamSpec + +from ..autoprofile.ast_tree_profiler import AstTreeProfiler +from ..autoprofile.run_module import AstTreeModuleProfiler +from ..autoprofile.util_static import modname_to_modpath +from .cache import LineProfilingCache + + +__all__ = ('create_runpy_wrapper',) + + +PS = ParamSpec('PS') +T = TypeVar('T') + + +THIS_MODULE = (lambda: None).__module__ + + +def _copy_module(name: str) -> ModuleType: + """ + Returns: + module (ModuleType): + Module object, which is a fresh copy of the module named + ``name`` + """ + spec = find_spec(name) + if spec is None: + raise ModuleNotFoundError(name) + assert spec.loader + assert callable(getattr(spec.loader, 'exec_module', None)) + module = ModuleType(spec.name) + for attr, value in { + '__spec__': spec, + '__name__': spec.name, + '__file__': spec.origin, + '__path__': spec.submodule_search_locations, + }.items(): + if value is not None: + setattr(module, attr, value) + spec.loader.exec_module(module) + return module + + +def _exec( + cache: LineProfilingCache, + CodeWriter: type[AstTreeProfiler], + _code, # This represents the first pos arg to `exec()` (ignored) + /, + *args, **kwargs, +) -> None: + """ + To be monkey-patched into :py:mod:`runpy`'s namespace as `exec()` + so that rewritten and autoprofiled code at ``cache.rewrite_module`` + is always executed. + """ + assert cache.rewrite_module + cache._debug_output('Calling via {}: `exec({})`'.format( + THIS_MODULE, + ', '.join( + [repr(a) for a in (_code, *args)] + + [f'{k}={v!r}' for k, v in kwargs.items()] + ), + )) + fname = str(cache.rewrite_module) + code_writer = CodeWriter( + fname, + list(cache.profiling_targets), + cache.profile_imports, + ) + code = compile(code_writer.profile(), fname, 'exec') + exec(code, *args, **kwargs) + + +def _run( + cache: LineProfilingCache, + runpy: ModuleType, + func: Callable[Concatenate[str, PS], T], + name: str, + resolve_target_to_path: Callable[[str], str], + CodeWriter: type[AstTreeProfiler], + target: str, + /, + *args: PS.args, **kwargs: PS.kwargs +) -> T: + cache._debug_output('Calling via {}: `runpy.{}({})`'.format( + THIS_MODULE, + name, + ', '.join( + [repr(a) for a in (target, *args)] + + [f'{k}={v!r}' for k, v in kwargs.items()] + ), + )) + if cache.rewrite_module: + try: + filename = resolve_target_to_path(target) + profile = os.path.samefile(filename, cache.rewrite_module) + except Exception as e: + cache._debug_output( + f'{THIS_MODULE}: Failed to check whether code loaded by ' + f'`runpy.{name}(...)` is to be rewritten ' + f'({type(e).__name__}: {e})' + ) + profile = False + else: + profile = False + # If we are about to run the code to be autoprofiled, monkey-patch + # `exec()` into the `runpy` namespace which just rewrites + # `cache.rewrite_module` and executes it + if profile: + # Dodge attr-defined errors and their ilk + vars(runpy)['exec'] = partial(_exec, cache, CodeWriter) + try: + return func(target, *args, **kwargs) + finally: + if hasattr(runpy, 'exec'): + del runpy.exec + + +def create_runpy_wrapper(cache: LineProfilingCache) -> ModuleType: + """ + Create a copy of :py:mod:`runpy` which does code rewriting similar + to :py:func:`line_profiler.autoprofile.autoprofile.run` for the + appropriate file as indicated by ``cache``. + """ + runpy = _copy_module('runpy') + for func, resolver, CodeWriter in [ + ('run_path', str, AstTreeProfiler), + ('run_module', modname_to_modpath, AstTreeModuleProfiler), + ]: + impl = getattr(runpy, func) + res = cast(Callable[[str], str], resolver) # Help `mypy` out + wrapper = partial(_run, cache, runpy, impl, func, res, CodeWriter) + setattr(runpy, func, wrapper) + return runpy diff --git a/line_profiler/_child_process_profiling/threading_patches.py b/line_profiler/_child_process_profiling/threading_patches.py new file mode 100644 index 00000000..c8a5b62c --- /dev/null +++ b/line_profiler/_child_process_profiling/threading_patches.py @@ -0,0 +1,121 @@ +""" +Patch :py:mod:`threading` so that profiling extends into processes +it creates. +""" +from __future__ import annotations + +import threading +from collections.abc import Callable +from functools import wraps +from typing import TYPE_CHECKING, Any, TypeVar +from typing_extensions import ParamSpec + +from .._line_profiler import ( # type: ignore + USE_LEGACY_TRACE as SHOULD_PATCH_THREADING, +) +from ..line_profiler import LineProfiler +from .cache import LineProfilingCache + + +__all__ = ('apply', 'SHOULD_PATCH_THREADING') + + +T = TypeVar('T') +PS = ParamSpec('PS') + +_PATCHED_MARKER = '__line_profiler_patched_threading__' + + +def make_syncing_wrapper( + func: Callable[PS, T], prof: LineProfiler, enable_count: int, +) -> Callable[PS, T]: + """ + Wrap the callable ``func`` so that when we spin up a new thread, we + sync the + :py:attr:`line_profiler.line_profiler.LineProfiler.enable_count` of + the active profiler (stored at the cache instance loaded from + :py:meth:`LineProfilingCache.load`) with ``enable_count``. + + Note: + This only seems to work as intended when using the legacy trace + system... + """ + @wraps(func) + def wrapper(*args: PS.args, **kwargs: PS.kwargs) -> T: + if TYPE_CHECKING: + assert hasattr(prof, 'enable_count') + assert isinstance(prof.enable_count, int) + # Note: `prof.enable_count` is most likely to be zero on the new + # thread + thread_enable_count: int = prof.enable_count + for _ in range(enable_count - thread_enable_count): + prof.enable_by_count() + try: + return func(*args, **kwargs) + finally: + # Reset enable counts to avoid problems if the thread id is + # ever reused + for _ in range(prof.enable_count - thread_enable_count): + prof.disable_by_count() + + return wrapper + + +# Threads are supposed to be lightweight, so don't waste time formatting +# debug messages during startup + + +@LineProfilingCache._method_wrapper(debug=False) +def wrap_init( + cache: LineProfilingCache, + vanilla_impl: Callable[..., None], + self: threading.Thread, + group: None = None, + target: Callable[..., Any] | None = None, + *a, **k +) -> None: + """ + Wrap the initializer of :py:class:`threading.Thread` so that the + profiler's :py:attr:`LineProfiler.enable_count` is synced up on + newly spun-up threads. + """ + prof = cache.profiler + enable_count: int | None = getattr(prof, 'enable_count', None) + if target is not None and enable_count: + if TYPE_CHECKING: + assert prof is not None + target = make_syncing_wrapper(target, prof, enable_count) + vanilla_impl(self, group, target, *a, **k) + + +def apply(lp_cache: LineProfilingCache) -> None: + """ + Set up profiling in threads started by :py:mod:`threading` by + applying patches to the module. + + Args: + lp_cache (LineProfilingCache) + Cache instance governing the profiling run + + Side effects: + - :py:mod:`threading` marked as having been set up + + - The following methods and functions patched: + + - :py:meth:`threading.Thread.__init__` + + - Cleanup callbacks registered via ``lp_cache.add_cleanup()`` + + Note: + This is a no-op when using :py:mod:`sys.monitoring`-based + profiling. + """ + if not SHOULD_PATCH_THREADING: + return + if getattr(threading, _PATCHED_MARKER, False): + return + init_wrapper = wrap_init(threading.Thread.__init__) + lp_cache.patch( + threading.Thread, '__init__', init_wrapper, name='threading.Thread', + ) + lp_cache.patch(threading, _PATCHED_MARKER, True, name='threading') diff --git a/line_profiler/curated_profiling.py b/line_profiler/curated_profiling.py new file mode 100644 index 00000000..6e21ffe8 --- /dev/null +++ b/line_profiler/curated_profiling.py @@ -0,0 +1,232 @@ +""" +Tools for setting up profiling in a curated environment (e.g. with +the use of :py:mod:`kernprof`). +""" +from __future__ import annotations + +import builtins +import dataclasses +import functools +import os +import warnings +from collections.abc import Callable, Collection +from io import StringIO +from textwrap import indent +from typing import Any, TextIO, cast +from typing_extensions import Self + +from . import _diagnostics as diagnostics, profile as _global_profiler +from .autoprofile.autoprofile import ( + _extend_line_profiler_for_profiling_imports as upgrade_profiler, +) +from .autoprofile.util_static import modpath_to_modname +from .autoprofile.eager_preimports import ( + is_dotted_path, write_eager_import_module, +) +from .cli_utils import short_string_path +from .line_profiler import LineProfiler +from .profiler_mixin import ByCountProfilerMixin + + +__all__ = ('ClassifiedPreimportTargets', 'CuratedProfilerContext') + + +@dataclasses.dataclass +class ClassifiedPreimportTargets: + """ + Pre-import targets classified into three bins: ``regular`` targets, + targets to ``recurse`` into, and ``invalid`` targets + """ + regular: list[str] = dataclasses.field(default_factory=list) + recurse: list[str] = dataclasses.field(default_factory=list) + invalid: list[str] = dataclasses.field(default_factory=list) + + def __bool__(self) -> bool: + return bool(self.regular or self.recurse) + + def write_preimport_module( + self, fobj: TextIO, *, debug: bool | None = None, **kwargs + ) -> None: + """ + Convenience interface with + :py:func:`~.write_eager_import_module`, writing a module which + when imported sets up profiling of the targets. + + Args: + fobj (TextIO): + File object to write said module to. + debug (Optional[bool]): + Whether to generate debugging outputs. + kwargs: + Passed to :py:func:`~.write_eager_import_module`. + """ + if self.invalid: + invalid_targets = sorted(set(self.invalid)) + msg = ( + '{} profile-on-import target{} cannot be converted to ' + 'dotted-path form: {!r}'.format( + len(invalid_targets), + '' if len(invalid_targets) == 1 else 's', + invalid_targets, + ) + ) + warnings.warn(msg) + diagnostics.log.warning(msg) + + if not self: + return None + # Note: `ty` (but not `mypy`) keeps complaining about the our + # splatting this dict; explicitly use `Any` to tell it to shut + # up. + write_module_kwargs: dict[str, Any] = { + 'dotted_paths': self.regular, + 'recurse': self.recurse, + **kwargs, + } + if diagnostics.DEBUG if debug is None else debug: + with StringIO() as sio: + write_eager_import_module(stream=sio, **write_module_kwargs) + code = sio.getvalue() + print(code, file=fobj) + if hasattr(fobj, 'name'): + fobj_repr = repr(short_string_path(str(fobj.name))) + else: + fobj_repr = repr(fobj) # Fall back + diagnostics.log.debug( + f'Wrote temporary module for pre-imports to {fobj_repr}:\n' + + indent(code, ' ') + ) + else: + write_eager_import_module(stream=fobj, **write_module_kwargs) + + @classmethod + def from_targets( + cls, + targets: Collection[str], + exclude: Collection[os.PathLike[str] | str] = (), + ) -> Self: + """ + Create an instance based on a collection of targets + (like what is supplied to :cmd:`kernprof --prof-mod=...`). + + Args: + targets (Collection[str]) + Collection of dotted paths and filenames to profile. + exclude (Collection[str]) + Collections of filenames which are explicitly excluded + from being profiled. + + Return: + New instance. + """ + filtered_targets = [] + recurse_targets = [] + invalid_targets = [] + for target in targets: + if is_dotted_path(target): + modname = target + else: + # Paths already normalized by + # `_normalize_profiling_targets()` + if not os.path.exists(target): + invalid_targets.append(target) + continue + if any( + os.path.samefile(target, excluded) for excluded in exclude + ): + # Ignore the script to be run in eager importing + # (`line_profiler.autoprofile.autoprofile.run()` + # will handle it) + continue + modname = modpath_to_modname(target, hide_init=False) + if modname is None: # Not import-able + invalid_targets.append(target) + continue + if modname.endswith('.__init__'): + modname = modname.rpartition('.')[0] + filtered_targets.append(modname) + else: + recurse_targets.append(modname) + return cls(filtered_targets, recurse_targets, invalid_targets) + + +class CuratedProfilerContext: + """ + Context manager for handling various bookkeeping tasks when setting + up and tearing down profiling: + + - Slipping ``prof`` into the builtin namespace (if + ``insert_builtin`` is true) and :py::deco:`~.profile` + - At exit, clearing the ``enable_count`` of ``prof``, properly + disabling it + + Note: + The attributes on this object are to be considered + implementation details, but not its methods and their + signatures. + """ + def __init__( + self, + prof: ByCountProfilerMixin, + insert_builtin: bool = False, + builtin_loc: str = 'profile', + ) -> None: + self.prof = prof + self.insert_builtin = insert_builtin + self.builtin_loc = builtin_loc + self._installed = False + self._kpo = _global_profiler._kernprof_overwrite + + def _global_install(self, prof: ByCountProfilerMixin | None) -> None: + # Wrapper to convince type-checkers it is okay to pass these + # stuff to `._kernprof_overwrite()`. We don't want to patch + # that method's signature because passing non `LineProfiler` + # objects to it should be the exception, not the norm. + self._kpo(cast(LineProfiler, prof)) + + def install(self) -> None: + def del_builtin_profile() -> None: + delattr(builtins, self.builtin_loc) + + def set_builtin_profile(old: Any) -> None: + setattr(builtins, self.builtin_loc, old) + + if self._installed: + return + # Equip the profiler instance with the + # `.add_imported_function_or_module()` pseudo-method + upgrade_profiler(self.prof) + # Overwrite the explicit profiler (`@line_profiler.profile`) + self._global_install(self.prof) # type: ignore + # Set up hooks to deal with inserting `.prof` as a builtin name + if self.insert_builtin: + try: + old = getattr(builtins, self.builtin_loc) + except AttributeError: + self._restore: Callable[[], None] = del_builtin_profile + else: + self._restore = functools.partial(set_builtin_profile, old) + set_builtin_profile(self.prof) + self._installed = True + + def uninstall(self) -> None: + if not self._installed: + return + # Restore the `builtins` namespace + if ( + self.insert_builtin + and getattr(builtins, self.builtin_loc, None) is self.prof + ): + self._restore() + # Fully disable the profiler + for _i in range(getattr(self.prof, 'enable_count', 0)): + self.prof.disable_by_count() + # Restore the state of the global `@line_profiler.profile` + self._global_install(None) + self._installed = False + + def __enter__(self) -> None: + self.install() + + def __exit__(self, *_, **__) -> None: + self.uninstall() diff --git a/line_profiler/line_profiler.py b/line_profiler/line_profiler.py index bede6f6c..50ef0d88 100755 --- a/line_profiler/line_profiler.py +++ b/line_profiler/line_profiler.py @@ -18,6 +18,7 @@ import tempfile import types import tokenize +import warnings from argparse import ArgumentParser from datetime import datetime from os import PathLike @@ -367,17 +368,66 @@ def to_file(self, filename: PathLike[str] | str) -> None: with open(filename, 'wb') as f: pickle.dump(self, f, pickle.HIGHEST_PROTOCOL) + @classmethod + def get_empty_instance(cls) -> Self: + """ + Returns: + instance (LineStats): + New instance without any profiling data. + """ + prof = LineProfiler() + if TYPE_CHECKING: + assert hasattr(prof, 'timer_unit') + return cls({}, cast(float, prof.timer_unit)) + @classmethod def from_files( - cls, file: PathLike[str] | str, /, *files: PathLike[str] | str + cls, + file: PathLike[str] | str, + /, + *files: PathLike[str] | str, + on_defective: Literal['ignore', 'warn', 'error'] = 'error', ) -> Self: """ Utility function to load an instance from the given filenames. + + Args: + file (PathLike[str] | str): + File to load profiling data from + *files (PathLike[str] | str): + Ditto above + on_defective (Literal['ignore', 'warn', 'error']): + What to do if some files fail to load: ``'ignore'`` + those files, skip them but with a ``'warn'``-ing, or + raise the ``'error'`` as soon as one is encountered + + Returns: + instance (LineStats): + New instance """ stats_objs = [] - for file in [file, *files]: + failures: dict[str, str] = {} + all_files = [file, *files] + for file in all_files: with open(file, 'rb') as f: - stats_objs.append(pickle.load(f)) + try: + stats_objs.append(pickle.load(f)) + except Exception as e: + if on_defective == 'error': + raise + failures[str(file)] = f'{type(e).__name__}: {e}' + if failures: + msg = ( + '{} file(s) out of {} failed to load and are skipped: {!r}' + .format(len(failures), len(all_files), failures) + ) + if on_defective == 'warn': + warnings.warn(msg) + diagnostics.log.warning(msg) + else: # 'ignore' + diagnostics.log.debug(msg) + if not stats_objs: + return cls.get_empty_instance() return cls.from_stats_objects(*stats_objs) @classmethod diff --git a/line_profiler/rc/line_profiler.toml b/line_profiler/rc/line_profiler.toml index 6680c06a..a420aad8 100644 --- a/line_profiler/rc/line_profiler.toml +++ b/line_profiler/rc/line_profiler.toml @@ -94,6 +94,9 @@ preimports = true # - `prof-imports` (bool): # `--prof-imports` (true) or `--no-prof-imports` (false) prof-imports = false +# - `prof-child-procs` (bool): +# `--prof-child-procs` (true) or `--no-prof-child-procs` (false) +prof-child-procs = false # - Misc flags # - `verbose` (count): @@ -206,3 +209,36 @@ hits = 9 time = 12 perhit = 8 percent = 8 + +# XXX: --- Start of implementation details --- +# `line_profiler._child_process_profiling.multiprocessing_patches` +# settings + +[tool.line_profiler.multiprocessing] + +# - `intercept_logs` (bool): +# Whether to patch the `multiprocessing.util` logging funcions so that +# the internal log messages are captured and writtent to the debug +# logs +intercept_logs = false + +[tool.line_profiler.multiprocessing.polling] + +# - `polling.cooldown` (float): +# Cooldown time (seconds) before successive polls on lock files (set +# to <= 0 to disable cooldowns) +cooldown = 0.03125 # 1/32-nd of a second +# - `polling.timeout` (float): +# Time (seconds) before the main process disregards the existence of +# lock file and unblocks calls to `.terminate()` a (most probably +# errored-out) child process anyway (set to <= 0 to disable timeouts) +timeout = 0.25 +# - `polling.on_timeout` (Literal['error', 'warn', 'ignore']) +# What to do when the above timeout is exhausted, before actually +# `.terminate()`-ing the child process: +# - 'error': raise an error +# - `warn`: issue a warning +# - `ignore`: nothing +on_timeout = 'warn' + +# XXX: --- End of implementation details --- diff --git a/line_profiler/toml_config.py b/line_profiler/toml_config.py index 781dc60d..236e86da 100644 --- a/line_profiler/toml_config.py +++ b/line_profiler/toml_config.py @@ -106,7 +106,10 @@ def get_subconfig( get_subtable(self.conf_dict, headers, allow_absence=allow_absence), ) new_subtable = [*self.subtable, *headers] - return type(self)(new_dict, self.path, new_subtable) + new_instance = type(self)(new_dict, self.path, new_subtable) + if copy: + new_instance = new_instance.copy() + return new_instance @classmethod def from_default(cls, *, copy: bool = True) -> ConfigSource: @@ -355,7 +358,8 @@ def iter_configs(dir_path): def get_subtable( - table: Mapping[K, Mapping], keys: Sequence[K], *, allow_absence: bool = True + table: Mapping[K, Mapping], keys: Sequence[K], *, + allow_absence: bool = True, ) -> Mapping: """ Arguments: diff --git a/tests/test_child_procs.py b/tests/test_child_procs.py index 10be3f78..cadf145e 100644 --- a/tests/test_child_procs.py +++ b/tests/test_child_procs.py @@ -1,33 +1,104 @@ from __future__ import annotations +import ast +import dataclasses +import enum +import inspect +import multiprocessing.pool import os +import re +import shlex import subprocess import sys -from collections.abc import Callable, Generator, Mapping +import sysconfig +from abc import ABC, abstractmethod +from collections.abc import ( + Callable, Collection, Generator, Iterable, Mapping, Sequence, +) +from contextlib import ExitStack +from functools import lru_cache, partial, wraps +from io import StringIO +from importlib import import_module from pathlib import Path +from runpy import run_path from tempfile import TemporaryDirectory from textwrap import dedent, indent +from time import monotonic +from typing import Any, Generic, Literal, TypeVar, cast, final, overload +from typing_extensions import Self, ParamSpec +from uuid import uuid4 import pytest import ubelt as ub +from line_profiler._child_process_profiling.cache import LineProfilingCache +from line_profiler._child_process_profiling.runpy_patches import ( + create_runpy_wrapper, +) +from line_profiler._child_process_profiling.threading_patches import ( + SHOULD_PATCH_THREADING, +) +from line_profiler.curated_profiling import ( + CuratedProfilerContext, ClassifiedPreimportTargets, +) +from line_profiler.line_profiler import LineProfiler, LineStats + + +T = TypeVar('T') +TCtx_ = TypeVar('TCtx_') +PS = ParamSpec('PS') +C = TypeVar('C', bound=Callable[..., Any]) NUM_NUMBERS = 100 NUM_PROCS = 4 -TEST_MODULE_BODY = dedent(f""" +START_METHODS = set(multiprocessing.get_all_start_methods()) +_DEBUG = True + + +def strip(s: str) -> str: + return dedent(s).strip('\n') + + +EXTERNAL_MODULE_BODY = strip(""" from __future__ import annotations -from argparse import ArgumentParser -from multiprocessing import Pool -def my_sum(x: list[int]) -> int: - result: int = 0 +def my_external_sum(x: list[int], fail: bool = False) -> int: + result: int = 0 # GREP_MARKER[EXT-INVOCATION] for item in x: - result += item + result += item # GREP_MARKER[EXT-LOOP] + if fail: + raise RuntimeError('forced failure') + return result +""") + +TEST_MODULE_TEMPLATE = strip(""" +from __future__ import annotations + +from argparse import ArgumentParser +from collections.abc import Callable +from multiprocessing import get_context, Pool +from typing import Literal + +from {EXT_MODULE} import my_external_sum + + +def my_local_sum(x: list[int], fail: bool = False) -> int: + result: int = 0 # GREP_MARKER[LOCAL-INVOCATION] + # The reversing is to prevent bytecode aliasing with + # `my_external_sum()` (see issue #424, PR #425) + for item in reversed(x): + result += item # GREP_MARKER[LOCAL-LOOP] + if fail: + raise RuntimeError('forced failure') return result -def sum_in_child_procs(length: int, n: int) -> int: +def sum_in_child_procs( + length: int, n: int, my_sum: Callable[[list[int]], int], + start_method: Literal['fork', 'forkserver', 'spawn'] | None = None, + fail: bool = False, +) -> int: my_list: list[int] = list(range(1, length + 1)) sublists: list[list[int]] = [] subsums: list[int] @@ -37,169 +108,956 @@ def sum_in_child_procs(length: int, n: int) -> int: while my_list: sublist, my_list = my_list[:sublength], my_list[sublength:] sublists.append(sublist) - with Pool(n) as pool: - subsums = pool.map(my_sum, sublists) + if start_method: + pool = get_context(start_method).Pool(n) + else: + pool = Pool(n) + with pool: + subsums = pool.starmap(my_sum, [(sl, fail) for sl in sublists]) pool.close() pool.join() - return my_sum(subsums) + return my_sum(subsums, fail) def main(args: list[str] | None = None) -> None: parser = ArgumentParser() parser.add_argument('-l', '--length', type=int, default={NUM_NUMBERS}) parser.add_argument('-n', type=int, default={NUM_PROCS}) + parser.add_argument( + '-s', '--start-method', + choices=['fork', 'forkserver', 'spawn'], default=None, + ) + parser.add_argument('-f', '--force-failure', action='store_true') + parser.add_argument( + '--local', + action='store_const', + dest='my_sum', + default=my_external_sum, + const=my_local_sum, + ) options = parser.parse_args(args) - print(sum_in_child_procs(options.length, options.n)) + print(sum_in_child_procs( + options.length, options.n, options.my_sum, + start_method=options.start_method, + fail=options.force_failure, + )) if __name__ == '__main__': main() -""").strip('\n') +""") + + +# ============================== Fixtures ============================== + + +@dataclasses.dataclass +class _ModuleFixture: + """ + Convenience wrapper around a Python source file which represents an + importable module. + """ + path: Path + monkeypatch: pytest.MonkeyPatch + dependencies: Collection[_ModuleFixture] = () + + def install( + self, *, + local: bool = False, children: bool = False, deps_only: bool = False, + ) -> None: + """ + Set the module at :py:attr:`~.path` up to be importable. + + Args: + local (bool): + Make it importable for the CURRENT process (via + :py:data:`sys.path`). + children (bool): + Make it importable for CHILD processes (via + ``os.environ['PYTHONPATH']``). + deps_only (bool): + If true, only does the equivalent setup for + dependencies. + """ + for dep in self.dependencies: + dep.install(local=local, children=children) + if deps_only: + return + path = str(self.path.parent) + if local: + self.monkeypatch.syspath_prepend(path) + if children: + self.monkeypatch.setenv('PYTHONPATH', path, prepend=os.pathsep) + + @staticmethod + def propose_name(prefix: str) -> Generator[str, None, None]: + """ + Propose a valid module name that isn't already occupied. + """ + while True: + name = '_'.join([prefix] + str(uuid4()).split('-')) + if name not in sys.modules: + assert name.isidentifier() + yield name + + @property + def name(self) -> str: + return self.path.stem + + +# Only write the files once per test session @pytest.fixture(scope='session') -def test_module() -> Generator[Path, None, None]: +def _ext_module() -> Generator[Path, None, None]: + name = next(_ModuleFixture.propose_name('my_ext_module')) with TemporaryDirectory() as mydir_str: my_dir = Path(mydir_str) my_dir.mkdir(exist_ok=True) - my_module = my_dir / 'my_test_module.py' - with my_module.open('w') as fobj: - fobj.write(TEST_MODULE_BODY + '\n') + my_module = my_dir / f'{name}.py' + my_module.write_text(EXTERNAL_MODULE_BODY) yield my_module -@pytest.mark.parametrize('as_module', [True, False]) -@pytest.mark.parametrize( - ('nnums', 'nprocs'), [(None, None), (None, 3), (200, None)], -) -def test_multiproc_script_sanity_check( - test_module: Path, - tmp_path_factory: pytest.TempPathFactory, - nnums: int, - nprocs: int, - as_module: bool, -) -> None: - """ - Sanity check that the test module functions as expected when run - with vanilla Python. - """ - _run_test_module( - _run_as_module if as_module else _run_as_script, - test_module, tmp_path_factory, [sys.executable], None, False, - nnums=nnums, nprocs=nprocs, +@pytest.fixture(scope='session') +def _test_module(_ext_module: Path) -> Generator[Path, None, None]: + name = next(_ModuleFixture.propose_name('my_test_module')) + body = TEST_MODULE_TEMPLATE.format( + EXT_MODULE=_ext_module.stem, + NUM_NUMBERS=NUM_NUMBERS, + NUM_PROCS=NUM_PROCS, ) + with TemporaryDirectory() as mydir_str: + my_dir = Path(mydir_str) + my_dir.mkdir(exist_ok=True) + my_module = my_dir / f'{name}.py' + my_module.write_text(body) + yield my_module -# Note: -# Currently code execution in child processes is not properly profiled; -# these tests are just for checking that `kernprof` doesn't impair the -# proper execution of `multiprocessing` code +@pytest.fixture +def ext_module( + _ext_module: Path, monkeypatch: pytest.MonkeyPatch, +) -> Generator[_ModuleFixture, None, None]: + """ + Yields: + :py:class:`_ModuleFixture` helper object containing the code at + :py:data:`EXTERNAL_MODULE_BODY` + """ + yield _ModuleFixture(_ext_module, monkeypatch) -fuzz_invocations = pytest.mark.parametrize( - ('runner', 'outfile', 'profile', - 'label'), # Dummy argument to make `pytest` output more legible - [ - (['kernprof', '-q'], 'out.prof', False, 'cProfile'), - # Run with `line_profiler` with and w/o profiling targets - (['kernprof', '-q', '-l'], 'out.lprof', False, - 'line_profiler-inactive'), - (['kernprof', '-q', '-l'], 'out.lprof', True, - 'line_profiler-active'), - ], -) +@pytest.fixture +def test_module( + _test_module: Path, + ext_module: _ModuleFixture, + monkeypatch: pytest.MonkeyPatch, +) -> Generator[_ModuleFixture, None, None]: + """ + Yields: + :py:class:`_ModuleFixture` helper object containing the code at + :py:data:`TEST_MODULE_TEMPLATE` + """ + yield _ModuleFixture(_test_module, monkeypatch, [ext_module]) -@fuzz_invocations -def test_running_multiproc_script( - test_module: Path, +@pytest.fixture +def test_module_clone( tmp_path_factory: pytest.TempPathFactory, - runner: str | list[str], - outfile: str | None, - profile: bool, - label: str, -) -> None: + monkeypatch: pytest.MonkeyPatch, + _test_module: Path, + ext_module: _ModuleFixture, +) -> Generator[_ModuleFixture, None, None]: """ - Check that `kernprof` can run the test module as a script - (`kernprof [...] `). + Yields: + :py:class:`_ModuleFixture` helper object containing the same + code as :py:data:`test_module` """ - _run_test_module( - _run_as_script, - test_module, tmp_path_factory, runner, outfile, profile, - ) + tmpdir = tmp_path_factory.mktemp('my_path') + name = next(_ModuleFixture.propose_name('my_cloned_module')) + path = tmpdir / f'{name}.py' + path.write_text(_test_module.read_text()) + yield _ModuleFixture(path, monkeypatch, [ext_module]) -@fuzz_invocations -def test_running_multiproc_module( - test_module: Path, +@pytest.fixture +def create_cache( tmp_path_factory: pytest.TempPathFactory, - runner: str | list[str], - outfile: str | None, - profile: bool, - label: str, -) -> None: + request: pytest.FixtureRequest, +) -> Generator[Callable[..., LineProfilingCache], None, None]: """ - Check that `kernprof` can run the test module as a module - (`kernprof [...] -m `). + Wrapper around the :py:class:`LineProfilingCache` instantiator + which: + + - Automatically creates a tempdir and provides it as the + :py:attr:`LineProfilingCache.cache_dir`, + + - Extends the argument ``preimports_module`` to allow for taking + boolean values: + + - ``True``: a temporary preimports module is automatically written + based on ``profiling_targets`` and supplied to the base + constructor. + + - ``False``: equivalent to ``None``. + + - Unless the argument ``_use_curated_profiler: bool = True`` is set + to :py:const`True`, automatically creates an instance of + :py:class:`LineProfiler` that is curated by a + :py:class:`CuratedProfilerContext` and provides it as the + :py:attr:`LineProfilingCache.profiler`, and + + - At teardown: + + - Removes tempdirs and tempfiles generated. + + - Restores the value of the class' internal reference to the + :py:meth:`LineProfilingCache.load`-ed instance. + + - Calls the `.cleanup()` method of each instance created. + + - Prints these diagnostics for each instance: + + - The stats on the ``.profiler`` associated with each instance + (if any) + + - The stats gathered by + :py:meth:`LineProfilingCache.gather_stats()` + + - The debug logs (if ``.debug`` is true) """ - _run_test_module( - _run_as_module, - test_module, tmp_path_factory, runner, outfile, profile, - ) + def instantiate( + *, + profiling_targets: Collection[str] = (), + preimports_module: os.PathLike[str] | str | bool | None = None, + _use_curated_profiler: bool = True, + **kwargs + ) -> LineProfilingCache: + tmpdir = tmp_path_factory.mktemp('my_cache_dir') + pim: os.PathLike[str] | str | None + if preimports_module in (True, False): + if preimports_module: + targets = ( + ClassifiedPreimportTargets.from_targets(profiling_targets) + ) + if targets: + pim = tmpdir / 'preimports.py' + with pim.open(mode='w') as fobj: + targets.write_preimport_module(fobj) + else: + pim = None + else: + pim = None + else: + # The type checker needs some convincing... + assert not isinstance(preimports_module, bool) + pim = preimports_module + cache = LineProfilingCache( + tmpdir, + profiling_targets=profiling_targets, + preimports_module=pim, + **kwargs, + ) + if _use_curated_profiler: + cache.profiler = request.getfixturevalue('curated_profiler') + instances.append(cache) + return cache + + def print_result( + cache: LineProfilingCache, topic: str, result: str, *notes: str, + ) -> None: + header = '{} ({}):'.format( + topic, '; '.join([f'cache instance {id(cache):#x}', *notes]), + ) + print(header, indent(result, ' '), sep='\n') + + def print_profiler_stats(cache: LineProfilingCache) -> None: + if cache.profiler is None: + result = '' + notes = [] + else: + with StringIO() as sio: + cache.profiler.print_stats(sio) + result = sio.getvalue() + notes = [f'profiler instance {id(cache.profiler):#x}'] + print_result(cache, 'Native profiler stats', result, *notes) + + def print_gathered_stats(cache: LineProfilingCache) -> None: + with StringIO() as sio: + cache.gather_stats().print(sio) + result = sio.getvalue() + print_result(cache, 'Gathered profiler stats', result) + + def print_debug_logs(cache: LineProfilingCache) -> None: + if cache.debug: + result = '\n'.join( + entry.to_text() for entry in cache._gather_debug_log_entries() + ) + else: + result = '' + print_result(cache, 'Gathered debug logs', result) + + instances: list[LineProfilingCache] = [] + handlers: list[Callable[[LineProfilingCache], None]] + handlers = [print_profiler_stats, print_gathered_stats, print_debug_logs] + try: + with _preserve_obj_attributes( + LineProfilingCache, ['_loaded_instance'], + ): + yield instantiate + finally: + for cache in instances: + callbacks: list[Callable[[], Any]] = [cache.cleanup] + callbacks.extend(partial(func, cache) for func in handlers) + for callback in callbacks: + try: + callback() + except Exception: + pass + + +@pytest.fixture +def curated_profiler() -> Generator[LineProfiler, None, None]: + """ + Yields: + Fresh instance of :py:class:`LineProfiler` that is managed by a + :py:class:`CuratedProfilerContext` + """ + prof = LineProfiler() + with CuratedProfilerContext(prof, insert_builtin=True): + yield prof + + +# ========================== Helper functions ========================== + + +class _NotSupplied(enum.Enum): + NOT_SUPPLIED = enum.auto() + + +class ResultMismatch(ValueError): + def __init__( + self, + expected: Any, + actual: Any | _NotSupplied = _NotSupplied.NOT_SUPPLIED, + ) -> None: + msg = f'expected: {expected}' + if actual != _NotSupplied.NOT_SUPPLIED: + msg = f'{msg}, got {actual}' + super().__init__(msg) + self.expected = expected + self.actual = actual + + @property + def rich_message(self) -> str: + msg = '{}: {}'.format(type(self).__name__, self.args[0]) + if self.__traceback__ is not None: + tb = self.__traceback__ + msg = '{}:{}: {}'.format( + tb.tb_frame.f_code.co_filename, tb.tb_lineno, msg, + ) + return msg + + +@final +@dataclasses.dataclass +class _Params: + """ + Convenience wrapper around :py:func:`pytest.mark.parametrize`. + """ + params: tuple[str, ...] + values: list[tuple[Any, ...]] + defaults: tuple[Any, ...] + + def __post_init__(self) -> None: + n = len(self.params) + assert all(p.isidentifier() for p in self.params) # Validity + assert len(set(self.params)) == n # Uniqueness + assert len(self.defaults) == n # Consistency + self.values = list(self._unique(self.values)) + assert all(len(v) == n for v in self.values) + + def __mul__(self, other: Self) -> Self: + """ + Form a Cartesian product between the two instances with disjoint + :py:attr:`~.params`, like stacking the + :py:func:`pytest.mark.parametrize `decorators. + + Example: + >>> p1 = _Params.new(('a', 'b'), [(0, 0), (1, 2), (3, 4)], + ... defaults=(1, 2)) + >>> p2 = _Params.new('c', [0, 5, 6]) + >>> p1 * p2 # doctest: +NORMALIZE_WHITESPACE + _Params(params=('a', 'b', 'c'), + values=[(0, 0, 0), (0, 0, 5), (0, 0, 6), + (1, 2, 0), (1, 2, 5), (1, 2, 6), + (3, 4, 0), (3, 4, 5), (3, 4, 6)], + defaults=(1, 2, 0)) + """ + assert not set(self.params) & set(other.params) + return type(self)( + self.params + other.params, + [sv + ov for sv in self.values for ov in other.values], + self.defaults + other.defaults, + ) + + def __add__(self, other: Self) -> Self: + """ + Concatenate two instances: + + - For parameters appearing in both, their lists of values are + concatenated. + + - For parameters appearing in either instance, the missing + values are taken from the other instance's + :py:attr:`~.defaults`. + + Note: + In the case of clashes, the :py:attr:`~.defaults` and the + order of the :py:attr:`~.params` of ``self`` (the left + operand) take precedence. + + Example: + >>> p1 = _Params.new(('a', 'b', 'c'), + ... [(0, 0, 0), # defaults + ... (1, 2, 3), (4, 5, 6)]) + >>> p2 = _Params.new(('c', 'd'), [(7, 8), (9, 10)], + ... defaults=(-1, -1)) + >>> p1 + p2 # doctest: +NORMALIZE_WHITESPACE + _Params(params=('a', 'b', 'c', 'd'), + values=[(0, 0, 0, -1), + (1, 2, 3, -1), + (4, 5, 6, -1), + (0, 0, 7, 8), + (0, 0, 9, 10)], + defaults=(0, 0, 0, -1)) + """ + self_defaults = dict(zip(self.params, self.defaults)) + other_defaults = dict(zip(other.params, other.defaults)) + new_params = tuple(self._unique(self.params + other.params)) + + defaults = {**other_defaults, **self_defaults} + new_defaults_tuple = tuple(defaults[p] for p in new_params) + + new_values: list[tuple[Any, ...]] = [] + for old_values, old_params in [ + (self.values, self.params), (other.values, other.params), + ]: + indices: list[ + tuple[Literal[True], int] | tuple[Literal[False], str] + ] = [ + (True, old_params.index(p)) if p in old_params else (False, p) + for p in new_params + ] + new_values.extend( + tuple( + ( + value[cast(int, index)] + if available else + defaults[cast(str, index)] + ) for available, index in indices + ) + for value in old_values + ) + return type(self)(new_params, new_values, new_defaults_tuple) + + def __call__(self, func: C) -> C: + """ + Mark a callable as with :py:func:`pytest.mark.parametrize`. + """ + # Note: `pytest` automatically assumes single-param values to + # be unpackes, so comply here + if len(self.params) == 1: + marker = pytest.mark.parametrize( + self.params[0], [v[0] for v in self.values], + ) + else: + marker = pytest.mark.parametrize(self.params, self.values) + return marker(func) + + @staticmethod + def _unique(items: Iterable[T]) -> Generator[T, None, None]: + seen: set[T] = set() + for item in items: + if item in seen: + continue + seen.add(item) + yield item + + @overload + @classmethod + def new( + cls, + params: Sequence[str] | str, + values: Sequence[Sequence[Any]], + defaults: Sequence[Any] | _NotSupplied = _NotSupplied.NOT_SUPPLIED, + ) -> Self: + ... + + @overload + @classmethod + def new( + cls, + params: str, + values: Sequence[Any], + defaults: Any | _NotSupplied = _NotSupplied.NOT_SUPPLIED, + ) -> Self: + ... + + @classmethod + def new( + cls, + params: Sequence[str] | str, + values: Sequence[Sequence[Any]] | Sequence[Any], + defaults: ( + Sequence[Any] | Any | _NotSupplied + ) = _NotSupplied.NOT_SUPPLIED, + ) -> Self: + """ + Instantiator more akin to :py:func:`pytest.mark.parametrize`: + + - ``params`` can be provided as a comma-separated string + + - Single parameters can be unpacked (singular param-name string + and param-value sequences) + + - If ``defaults`` are not given, it is implicitly set to the + FIRST item in ``values``. + """ + if isinstance(params, str): + param_list: tuple[str, ...] = tuple( + p.strip() for p in params.split(',') + ) + unpacked = len(param_list) == 1 + else: + param_list = tuple(params) + unpacked = False + if defaults == _NotSupplied.NOT_SUPPLIED: + defaults, *_ = values + if unpacked: + default_values: tuple[Any, ...] = defaults, + value_tuple_list: list[tuple[Any, ...]] = [(v,) for v in values] + else: + default_values = tuple(defaults) # type: ignore[arg-type] + value_tuple_list = [tuple(v) for v in values] + return cls(param_list, value_tuple_list, default_values) + + +class _CallableContextManager(ABC, Generic[TCtx_]): + debug: bool + + @abstractmethod + def __enter__(self) -> TCtx_: + ... + + @abstractmethod + def __exit__(self, *a, **k) -> Any: + ... + + def __call__(self, func: Callable[PS, T]) -> Callable[PS, T]: + """ + Wrap ``func()`` so that its calls always happen in the context + of the instance. + """ + @wraps(func) + def wrapper(*args: PS.args, **kwargs: PS.kwargs) -> T: + with self: + return func(*args, **kwargs) + + return wrapper + + def _debug(self, msg: str, **kwargs) -> None: + if not self.debug: + return + header = f'{os.environ["PYTEST_CURRENT_TEST"]}: {type(self).__name__}' + print(f'{header}: {msg}', **kwargs) + + +class _preserve_obj_attributes(_CallableContextManager[dict[str, Any]]): + def __init__( + self, obj: Any, attrs: Collection[str], debug: bool = _DEBUG, + ) -> None: + self.obj = obj + self.attrs = set(attrs) + self._callbacks: list[Callable[[], None]] = [] + self.debug = debug + + def __enter__(self) -> dict[str, Any]: + def get_repr(attr: str) -> str: + try: + value = getattr(self.obj, attr) + except ValueError: + return '' + else: + return repr(value) + + def delete(attr: str) -> None: + try: + self._debug('Deleted attr `.{} = {}` on `{!r}`'.format( + attr, get_repr(attr), self.obj, + )) + delattr(self.obj, attr) + except AttributeError: + pass + + def reset(attr: str, value: Any) -> None: + self._debug('Reset attr `.{} = {} -> {!r}` on `{!r}`'.format( + attr, get_repr(attr), value, self.obj, + )) + setattr(self.obj, attr, value) + + result: dict[str, Any] = {} + for attr in self.attrs: + old = getattr(self.obj, attr, _NotSupplied.NOT_SUPPLIED) + if old is _NotSupplied.NOT_SUPPLIED: + callback = partial(delete, attr) + else: + callback = partial(reset, attr, old) + result[attr] = old + self._callbacks.append(callback) + return result + + def __exit__(self, *_, **__) -> None: + for callback in self._callbacks[::-1]: + try: + callback() + except Exception: + pass + + +class _preserve_attributes(_CallableContextManager[dict[str, dict[str, Any]]]): + """ + Example: + >>> from functools import wraps + >>> from line_profiler.curated_profiling import ( + ... CuratedProfilerContext, + ... ) + >>> from line_profiler import line_profiler + + >>> assert not hasattr(CuratedProfilerContext, 'foo') + >>> old_main = line_profiler.main + >>> + >>> + >>> def foo(_) -> None: + ... pass + ... + >>> + >>> @wraps(old_main) + ... def main(*a, **k): + ... return old_main(*a, **k) + ... + >>> + >>> preserved = { + ... 'line_profiler.curated_profiling' + ... '.CuratedProfilerContext': {'foo'}, + ... 'line_profiler.line_profiler': {'main'}, + ... } + >>> with _preserve_attributes(preserved, debug=False) as old: + ... assert old == { + ... 'line_profiler.curated_profiling' + ... '.CuratedProfilerContext': { + ... 'foo': _NotSupplied.NOT_SUPPLIED, + ... }, + ... 'line_profiler.line_profiler': {'main': old_main}, + ... } + ... CuratedProfilerContext.foo = foo + ... line_profiler.main = main + ... print('ok') + ... + ok + >>> assert not hasattr(CuratedProfilerContext, 'foo') + >>> assert old_main is \ +old['line_profiler.line_profiler']['main'] + >>> assert old_main is line_profiler.main + >>> assert main is not line_profiler.main + """ + def __init__( + self, targets: Mapping[str, Collection[str]], debug: bool = _DEBUG, + ) -> None: + self.targets = { + target: set(attrs) for target, attrs in targets.items() + } + self._stacks: list[ExitStack] = [] + self.debug = debug + + def __enter__(self) -> dict[str, dict[str, Any]]: + stack = ExitStack() + self._stacks.append(stack) + result: dict[str, Any] = {} + for target, attrs in self.targets.items(): + result[target] = stack.enter_context(_preserve_obj_attributes( + _import_target(target), attrs, debug=self.debug, + )) + return result + + def __exit__(self, *_, **__) -> None: + self._stacks.pop().close() + + +class _preserve_pth_files(_CallableContextManager[frozenset[str]]): + def __init__(self, debug: bool = _DEBUG) -> None: + self.debug = debug + + def __enter__(self) -> frozenset[str]: + self.old = self.get_pth_files() + return self.old + + def __exit__(self, *_, **__) -> None: + for new_pth_file in self.get_pth_files() - self.old: + self._debug(f'Deleting stray .pth file: {new_pth_file!r}') + (self._get_path() / new_pth_file).unlink(missing_ok=True) + del self.old + + @classmethod + def get_pth_files(cls, name_only: bool = True) -> frozenset[str]: + return frozenset( + pth.name if name_only else str(pth) + for pth in cls._get_path().glob('*.pth') + ) + + @staticmethod + def _get_path() -> Path: + return Path(sysconfig.get_path('purelib')) + + +def _import_target(target: str) -> Any: + try: + return import_module(target) + except ImportError: # Not a module + assert '.' in target + module, _, attr = target.rpartition('.') + return getattr(import_module(module), attr) + + +def _search_cache_logs( + cache: LineProfilingCache, + expecting_logs: bool, + patterns: Mapping[str, bool] | Collection[str], + match_individual_messages: bool = False, + flags: int = 0, +) -> None: + entries = cache._gather_debug_log_entries() + if bool(entries) != expecting_logs: + raise ResultMismatch( + 'logs' if expecting_logs else 'no logs', + repr(entries) if entries else 'nothing' + ) + if not expecting_logs: + return + text_chunks: list[str] = [entry.to_text() for entry in entries] + if not match_individual_messages: + text_chunks = ['\n'.join(text_chunks)] + if isinstance(patterns, Mapping): + to_match: dict[str, bool] = { + str(pat): bool(should_match) + for pat, should_match in patterns.items() + } + else: + to_match = dict.fromkeys(patterns, True) + for pat, should_match in to_match.items(): + pattern = re.compile(pat, flags) + if any(pattern.search(chunk) for chunk in text_chunks) == should_match: + continue + raise ResultMismatch( + f'pattern {pattern!r} to {"" if should_match else "not "}match ' + f'{cache!r}\'s logs: {text_chunks!r}' + ) + + +@lru_cache() +def _find_return_lines(func: str) -> list[int]: + class FindReturns(ast.NodeVisitor): + def __init__(self) -> None: + self.found: set[int] = set() + + def visit_Return(self, node: ast.Return) -> None: + self.found.add(node.lineno) + self.generic_visit(node) + + func_obj = _import_target(func) + assert inspect.isfunction(func_obj) + lines, start = inspect.getsourcelines(func_obj) + tree = ast.parse(''.join(lines)) + finder = FindReturns() + finder.visit(tree) + return sorted(lineno + start - 1 for lineno in finder.found) + + +# `shlex.join()` doesn't work properly on Windows, so use +# `subprocess.list2cmdline()` instead; +# though an "intentionally" undocumented API (cpython issue #10308), +# it's been around since 2.4, seems stable enough, and does exactly what +# is needed +if sys.platform == 'win32': + concat_command_line: Callable[ + [Sequence[str]], str + ] = subprocess.list2cmdline +else: + concat_command_line = shlex.join def _run_as_script( - runner_args: list[str], test_args: list[str], test_module: Path, **kwargs + runner_args: list[str], test_args: list[str], test_module: _ModuleFixture, + **kwargs ) -> subprocess.CompletedProcess: - cmd = runner_args + [str(test_module)] + test_args - return subprocess.run(cmd, **kwargs) + cmd = runner_args + [str(test_module.path)] + test_args + test_module.install(children=True, deps_only=True) + return _run_subproc(cmd, **kwargs) def _run_as_module( - runner_args: list[str], - test_args: list[str], - test_module: Path, - *, + runner_args: list[str], test_args: list[str], test_module: _ModuleFixture, + **kwargs +) -> subprocess.CompletedProcess: + cmd = runner_args + ['-m', test_module.name] + test_args + test_module.install(children=True) + return _run_subproc(cmd, **kwargs) + + +def _run_as_literal_code( + runner_args: list[str], test_args: list[str], test_module: _ModuleFixture, + **kwargs +) -> subprocess.CompletedProcess: + cmd = runner_args + ['-c', test_module.path.read_text()] + test_args + test_module.install(children=True, deps_only=True) + return _run_subproc(cmd, **kwargs) + + +def _run_subproc( + cmd: Sequence[str] | str, + /, + *args, + check: bool = False, env: Mapping[str, str] | None = None, **kwargs ) -> subprocess.CompletedProcess: - cmd = runner_args + ['-m', test_module.stem] + test_args - env_dict = {**os.environ, **(env or {})} - python_path = env_dict.pop('PYTHONPATH', '') - if python_path: - env_dict['PYTHONPATH'] = '{}:{}'.format( - test_module.parent, python_path, + """ + Wrapper around :py:func:`subprocess.run` which writes debugging + output. + """ + if isinstance(cmd, str): + cmd_str = cmd + else: + cmd_str = concat_command_line(cmd) + + # If we're capturing outputs, it may be for the best to wait until + # we've processed the output streams to check the return code... + check_rc_in_run = check + for arg in 'stdout', 'stdin': + if kwargs.get(arg) not in {None, subprocess.DEVNULL}: + check_rc_in_run = False + if kwargs.get('capture_output'): + check_rc_in_run = False + + print('Command:', cmd_str) + if env is not None: + diff: list[str] = [] + for key in set(os.environ).union(env): + old = os.environ.get(key) + new = env.get(key) + if old is not None is new: + item = f'{old!r} -> (deleted)' + elif old is None is not new: + item = f'{new!r} (added)' + else: + if old == new: + continue + item = f'{old!r} -> {new!r}' + diff.append(f'${{{key}}}: {item}') + if diff: + print('Env:', indent('\n'.join(diff), ' '), sep='\n') + print('-- Process start --') + # Note: somehow `mypy` doesn't agree with simply unpacking the + # `*args` into `subprocess.run()`... + status: int | str = '???' + proc: subprocess.CompletedProcess | None = None + time = monotonic() + try: + proc = subprocess.run( # type: ignore[call-overload] + cmd, *args, env=env, check=check_rc_in_run, **kwargs, ) + except Exception: + status = 'error' + raise else: - env_dict['PYTHONPATH'] = str(test_module.parent) - return subprocess.run(cmd, env=env_dict, **kwargs) + assert proc is not None + if check and not check_rc_in_run: # Perform missing check + proc.check_returncode() + status = proc.returncode + return proc + finally: + time = monotonic() - time + if proc is not None: + captured: str | bytes | None + for name, captured, stream in [ + ('stdout', proc.stdout, sys.stdout), + ('stderr', proc.stderr, sys.stderr), + ]: + if captured is None: + continue + if isinstance(captured, bytes): # `text=False` + captured = captured.decode() + print(f'{name}:\n{indent(captured, " ")}', file=stream) + print( + f'-- Process end (time elapsed: {time:.2f} s / ' + f'return status: {status})--' + ) def _run_test_module( run_helper: Callable[..., subprocess.CompletedProcess], - test_module: Path, + test_module: _ModuleFixture, tmp_path_factory: pytest.TempPathFactory, - runner: str | list[str], - outfile: str | None, - profile: bool, + runner: str | list[str] = 'kernprof', + outfile: str | None = None, + profile: bool = True, *, + profiled_code_is_tempfile: bool = False, + use_local_func: bool = False, + fail: bool = False, + start_method: Literal['fork', 'forkserver', 'spawn'] | None = None, nnums: int | None = None, nprocs: int | None = None, check: bool = True, -) -> tuple[subprocess.CompletedProcess, Path | None]: + debug_log: str | None = None, + nhits: Mapping[str, int] | None = None, + **kwargs +) -> tuple[subprocess.CompletedProcess, LineStats | None]: """ - Return - ------ - `(process_running_the_test_module, path_to_profiling_output | None)` + Returns: + process_running_the_test_module (subprocess.CompletedProcess): + Process object + profliing_stats (LineStats | None): + Line-profiling stats (where available) """ if isinstance(runner, str): runner_args: list[str] = [runner] else: runner_args = list(runner) - if profile: - runner_args.extend(['--prof-mod', str(test_module)]) + + if not profile: + nhits = None + + if profile and not profiled_code_is_tempfile: + runner_args.extend(['--prof-mod', str(test_module.path)]) + if nhits is not None: + # We need `kernprof` to write the profliing results immediately + # to preserve data from tempfiles (see note below) + runner_args.append('--view') test_args: list[str] = [] + if use_local_func: + test_args.append('--local') + if fail: + test_args.append('--force-failure') + if start_method: + if start_method in START_METHODS: + test_args.extend(['-s', start_method]) + else: + pytest.skip( + f'`multiprocessing` start method {start_method!r} ' + 'not available on the platform' + ) if nnums is None: nnums = NUM_NUMBERS else: @@ -210,24 +1068,856 @@ def _run_test_module( with ub.ChDir(tmp_path_factory.mktemp('mytemp')): if outfile is not None: runner_args.extend(['--outfile', outfile]) + if debug_log: + runner_args.extend(['--debug-log', debug_log]) + old_pth_files = _preserve_pth_files.get_pth_files() proc = run_helper( runner_args, test_args, test_module, - text=True, capture_output=True, + text=True, capture_output=True, check=(check and not fail), + **kwargs ) try: - if check: - proc.check_returncode() + # Checks: + if fail: + # - The process has failed as expected + if check: + assert proc.returncode + else: + # - The result is correctly calculated + expected = nnums * (nnums + 1) // 2 + output_lines = proc.stdout.splitlines() + if output_lines[0] != str(expected): + raise ResultMismatch( + f'result {expected}', f'output lines: {output_lines}', + ) + # - Temporary `.pth` file(s) created by `~~.pth_hook` has + # been cleaned up + assert _preserve_pth_files.get_pth_files() == old_pth_files + # - Profiling results are written to the specified file + prof_result: LineStats | None = None + if outfile is None: + assert not list(Path.cwd().iterdir()) + else: + assert os.path.exists(outfile) + assert os.stat(outfile).st_size + if profile: + prof_result = LineStats.from_files(outfile) + # - If we're keeping track, the function is called the + # expected number of times and has run the expected # of + # loops (Note: we do it by parsing the output of + # `kernprof -v` instead of reading the `--outfile`, + # because if the profiled code is in a tempfile the + # profiling data will be dropped in the written outfile) + for tag, num in (nhits or {}).items(): + _check_output(proc.stdout, tag, num) finally: - print(f'stdout:\n{indent(proc.stdout, " ")}') - print(f'stderr:\n{indent(proc.stderr, " ")}', file=sys.stderr) + if debug_log is not None: + with open(debug_log) as fobj: + print('-- Combined debug logs --', file=sys.stderr) + print(indent(fobj.read(), ' '), end='', file=sys.stderr) + print('-- End of debug logs --', file=sys.stderr) + return proc, prof_result - assert proc.stdout == f'{nnums * (nnums + 1) // 2}\n' - prof_result: Path | None = None - if outfile is None: - assert not list(Path.cwd().iterdir()) - else: - prof_result = Path(outfile).resolve() - assert prof_result.exists() - assert prof_result.stat().st_size - return proc, prof_result +def _check_output(output: str, tag: str, nhits: int) -> None: + # The line should be preixed with 5 numbers: + # lineno, nhits, time, time-per-hit, % time + actual_nhits = 0 + for line in output.splitlines(): + if line.endswith(f'# GREP_MARKER[{tag}]'): + try: + _, n, _, _, _, *_ = line.split() + actual_nhits += int(n) + except Exception: + pass + if actual_nhits == nhits: + return + raise ResultMismatch( + f'{nhits} hit(s) on line(s) tagged with {tag!r}', actual_nhits, + ) + + +run_module = partial(_run_test_module, _run_as_module) +run_script = partial(_run_test_module, _run_as_script) +run_literal_code = partial( + _run_test_module, _run_as_literal_code, profiled_code_is_tempfile=True, +) + +# ============================= Unit tests ============================= + +# XXX: Tests in this section concerns implementation details, and the +# tested APIs and behaviors MUST NOT be relied upon by end-users. + +_GLOBAL_PATCHES = { + 'multiprocessing.process.BaseProcess': frozenset({ + '_bootstrap', 'terminate', + }), + 'multiprocessing.spawn': frozenset({'runpy'}), + 'os': frozenset({'fork'}), +} +if SHOULD_PATCH_THREADING: + _GLOBAL_PATCHES['threading.Thread'] = frozenset({'__init__'}) + +# NOTE: we need a function which isn't used by the codebase itself +# (esp. during cache cleanup); otherwise the profiling results may +# be skewed +_SAFE_TARGET = 'calendar.weekday' +_SAFE_TARGET_ARGS = [ + (1970, 1, 1), + (2000, 12, 31), + (2008, 9, 16), # Where the repo started +] + + +@pytest.mark.parametrize(('run_profiled_code', 'label1'), + [(True, 'run-profiled'), (False, 'run-unrelated')]) +@pytest.mark.parametrize(('as_module', 'label2'), + [(True, 'run_module'), (False, 'run_path')]) +@pytest.mark.parametrize(('debug', 'label3'), + [(True, 'with-debug'), (False, 'no-debug')]) +def test_runpy_patches( + capsys: pytest.CaptureFixture[str], + ext_module: _ModuleFixture, + test_module: _ModuleFixture, + test_module_clone: _ModuleFixture, + create_cache: Callable[..., LineProfilingCache], + run_profiled_code: bool, + as_module: bool, + debug: bool, + label1: str, label2: str, label3: str, +) -> None: + """ + Test that the :py:mod:`runpy` clone created by + :py:func:`line_profiler._child_process_profiling\ +.create_runpy_wrapper` + correctly sets up profiling when its ``run_*()`` functions are + called. + """ + class restore_argv: + def __enter__(self) -> None: + self.argv = list(sys.argv) + + def __exit__(self, *_, **__) -> None: + sys.argv[:] = self.argv + + cache = create_cache( + rewrite_module=test_module.path, + profiling_targets=[str(ext_module.path)], + profile_imports=True, + debug=debug, + ) + assert cache.profiler is not None + runpy = create_runpy_wrapper(cache) + + nnums = 42 + nprocs = 2 + # If we're running some unrelated code, the profiler should not be + # involved + if run_profiled_code: + module = test_module + num_invocations, num_loops = 1, nprocs + expected_funcs: list[str] = ['my_external_sum'] + else: + module = test_module_clone + num_invocations, num_loops = 0, 0 + expected_funcs = [] + if as_module: + first_arg = module.name + runner = partial(runpy.run_module, alter_sys=True) + called_func = 'run_module' + else: + first_arg = str(module.path) + runner = runpy.run_path + called_func = 'run_path' + + # Check that the code is run + module.install(local=True, deps_only=not as_module) + with restore_argv(): + sys.argv[:] = [first_arg, f'--length={nnums}', '-n', str(nprocs)] + runner(first_arg, run_name='__main__') + stdout = capsys.readouterr().out + assert stdout.rstrip('\n') == str(nnums * (nnums + 1) // 2) + + # Check that profiler has received the appropriate targets + funcs = [func.__name__ for func in getattr(cache.profiler, 'functions')] + assert funcs == expected_funcs + + # Check that calls in the current process are profiled iif the + # correct file is executed + with StringIO() as sio: + cache.profiler.print_stats(sio) + stats = sio.getvalue() + _check_output(stats, 'EXT-INVOCATION', num_invocations) + _check_output(stats, 'EXT-LOOP', num_loops) + + # Check the debug-log entries are correctly gathered + _search_cache_logs( + cache, + debug, + { + rf'calling .*{called_func}\(': True, + r'calling .*exec\(': run_profiled_code, + }, + match_individual_messages=True, + flags=re.IGNORECASE, + ) + + +def test_cache_dump_load( + create_cache: Callable[..., LineProfilingCache], +) -> None: + """ + Test that: + + - We can round-trip the cache via :py:meth:`LineProfilingCache.dump` + and :py:meth:`LineProfilingCache.load` + + - The same instance is :py:meth:`LineProfilingCache.load`-ed in + subsequent calls + """ + original = create_cache( + profiling_targets=['foo', 'bar', 'baz'], main_pid=123456, + ) + envvars: set[str] = set(os.environ) + try: + original.inject_env_vars() # Needed for `.load()` + try: + # Env vars should be inserted + assert set(os.environ) == envvars.union(original.environ) > envvars + original.dump() + loaded = original.load() + reloaded = original.load() + assert original is not loaded is reloaded + # Compare init fields + for field in dataclasses.fields(LineProfilingCache): + if not field.init: + continue + assert ( + getattr(original, field.name) + == getattr(loaded, field.name) + ) + finally: # Explicitly cleanup + original.cleanup() + finally: # Env vars restored after cleanup + assert set(os.environ) == envvars + + +@pytest.mark.parametrize(('debug', 'label'), + [(True, 'with-debug'), (False, 'no-debug')]) +def test_cache_cleanup_order( + create_cache: Callable[..., LineProfilingCache], debug: bool, label: str, +) -> None: + """ + Test that :py:meth`LineProfilingCache.cleanup` executes the cleanup + callback stacks in order. + """ + strings: list[str] = [] + cache = create_cache(debug=debug) + + cache.add_cleanup(strings.append, 'first') + # Decreased priority + cache._add_cleanup(strings.append, 1, 'second') + # Increased priority + cache._add_cleanup(strings.append, -1, 'third') + cache.add_cleanup(strings.append, 'fourth') + + cache.cleanup() + assert strings == ['third', 'fourth', 'first', 'second'] + + pattern = '\n'.join( + rf'.*succeeded \({i + 1}/{len(strings)}\): .*append.*{string!r}.*' + for i, string in enumerate(strings) + ) + _search_cache_logs(cache, debug, [pattern]) + + +@pytest.mark.parametrize(('wrap_os_fork', 'label1'), + [(True, 'with-wrap-fork'), (False, 'no-wrap-fork')]) +@pytest.mark.parametrize(('debug', 'label2'), + [(True, 'with-debug'), (False, 'no-debug')]) +def test_cache_setup_main_process( + create_cache: Callable[..., LineProfilingCache], + wrap_os_fork: bool, + debug: bool, + label1: str, label2: str, +) -> None: + """ + Test that :py:meth:`LineProfilingCache._setup_in_main_process` works + as expected. + """ + cache = create_cache(debug=debug) + patches: dict[str, dict[str, bool]] = { + target: dict.fromkeys(attrs, True) + for target, attrs in _GLOBAL_PATCHES.items() + } + patches['os']['fork'] = wrap_os_fork and (sys.platform != 'win32') + targets: dict[str, Any] = { + target: _import_target(target) for target in patches + } + with ExitStack() as stack: + patched = stack.enter_context(_preserve_attributes(patches)) + original_pths = stack.enter_context(_preserve_pth_files()) + cache._setup_in_main_process(wrap_os_fork=wrap_os_fork) + # There should be exactly one extra `.pth` file + new_pth_hook, = _preserve_pth_files.get_pth_files() - original_pths + # Check whether the patches are applied + for target, maybe_patches in patches.items(): + obj = targets[target] + for attr, is_patched in maybe_patches.items(): + orig_value = patched[target][attr] + if orig_value is _NotSupplied.NOT_SUPPLIED: + assert not hasattr(obj, attr) + else: + assert (getattr(obj, attr) is orig_value) != is_patched + # Check whether the patches are reversed + cache.cleanup() + for target, orig_attrs in patched.items(): + obj = targets[target] + for attr, orig_value in orig_attrs.items(): + if orig_value is _NotSupplied.NOT_SUPPLIED: + assert not hasattr(obj, attr) + else: + assert getattr(obj, attr) is orig_value + # Check that the instance is set as the `.load()`-ed one + assert cache is cache.load() + + # Check the debug-log output + patterns: dict[str, bool] = dict.fromkeys( + [ + r'\(main process\)', + r'Injecting env var.*\$\{LINE_PROFILER_\w+\}', + re.escape(new_pth_hook), + ], + True, + ) + for target, maybe_patches in patches.items(): + patterns.update( + ('Patched.*' + re.escape(f'{target}.{attr}'), is_patched) + for attr, is_patched in maybe_patches.items() + ) + _search_cache_logs(cache, debug, patterns) + + +@pytest.mark.parametrize(('wrap_os_fork', 'label1'), + [(True, 'with-wrap-fork'), (False, 'no-wrap-fork')]) +@pytest.mark.parametrize(('preimports', 'label2'), + [(True, 'with-preimports'), (False, 'no-preimports')]) +@pytest.mark.parametrize(('new_profiler', 'label3'), + [(True, 'no-profiler'), (False, 'with-profiler')]) +@pytest.mark.parametrize(('debug', 'label4'), + [(True, 'with-debug'), (False, 'no-debug')]) +def test_cache_setup_child( + create_cache: Callable[..., LineProfilingCache], + curated_profiler: LineProfiler, + wrap_os_fork: bool, + preimports: bool, + new_profiler: bool, + debug: bool, + label1: str, label2: str, label3: str, label4: str, +) -> None: + """ + Test that :py:meth:`LineProfilingCache._setup_in_child_process` + works as expected. + """ + def list_profiled_funcs() -> list[str]: + return [ + f'{func.__module__}.{func.__qualname__}' + for func in getattr(cache.profiler, 'functions', []) + ] + + # Make sure we get a different PID from the current process + curr_pid = os.getpid() + main_pid = (curr_pid - 42) % (2 * 16) + assert main_pid != curr_pid + + cache = create_cache( + profiling_targets=[_SAFE_TARGET], + preimports_module=preimports, + _use_curated_profiler=not new_profiler, + main_pid=main_pid, + debug=debug, + ) + assert (cache.profiler is None) == new_profiler + + seen_funcs = list_profiled_funcs() + if preimports: + preimport_targets = list(cache.profiling_targets) + else: + preimport_targets = [] + + with _preserve_obj_attributes(os, ['fork']) as preserved: + old_fork = preserved['fork'] + # Check that we're only setting up if there isn't already a + # profiler + assert cache._setup_in_child_process( + wrap_os_fork=wrap_os_fork, context='test_cache_setup_child', + ) == new_profiler + assert cache.profiler + if not new_profiler: + return + + # Check that the profiler has been presented with the profiling + # target + assert list_profiled_funcs() == (seen_funcs + preimport_targets) + + # Check that on cache cleanup: + # - Profiling data is collected + # - `os.fork()` is restored + _import_target(_SAFE_TARGET)(*_SAFE_TARGET_ARGS[0]) + stats = cache.profiler.get_stats() + for callback, has_prof_data, fork_patched in [ + (lambda: None, False, wrap_os_fork), + (cache.cleanup, preimports, False), + ]: + callback() + gathered = cache.gather_stats() + assert any(gathered.timings.values()) == has_prof_data, gathered + if hasattr(os, 'fork'): + assert (os.fork is not old_fork) == fork_patched + else: # E.g. Windows + assert old_fork == _NotSupplied.NOT_SUPPLIED + + # Check that profiling results have been written to the cache + # directory + stats_file, = Path(cache.cache_dir).glob('*.lprof') + assert LineStats.from_files(stats_file) == stats == gathered + + # Check the debug-log output + patterns = { + f'Set up .*profiler.* {id(cache.profiler):#x}': True, + 'Loading preimports': preimports, + 'Created .*' + re.escape(stats_file.name): True, + 'Cleanup succeeded.*: .*dump_stats': True, + 'Loading results .*' + re.escape(stats_file.name): True, + } + _search_cache_logs(cache, debug, patterns) + + +@pytest.mark.parametrize('start_method', + ['fork', 'forkserver', 'spawn', 'dummy']) +@pytest.mark.parametrize(('debug', 'label'), + [(True, 'with-debug'), (False, 'no-debug')]) +@_preserve_pth_files() +@_preserve_attributes(_GLOBAL_PATCHES) +def test_apply_mp_patches( + tmp_path_factory: pytest.TempPathFactory, + create_cache: Callable[..., LineProfilingCache], + start_method: Literal['fork', 'forkserver', 'spawn', 'dummy'], + debug: bool, + label: str, +) -> None: + """ + Test that :py:func:`line_profiler._child_process_profiling\ +.multiprocessing_patches.apply` + works as expected. + """ + def is_valid_stats_file(path: os.PathLike[str] | str) -> bool: + try: + LineStats.from_files(path, on_defective='error') + except Exception: + return False + return True + + config: Path | None = None + if debug: + config = tmp_path_factory.mktemp('myconfig') / 'mytoml.toml' + config.write_text( + '[tool.line_profiler.multiprocessing]\n' + 'intercept_logs = true' + ) + + cache = create_cache( + profiling_targets=[_SAFE_TARGET], + preimports_module=True, + config=config, + debug=True, + ) + # Note: + # - The reversibility of the patches have already been tested in + # `test_cache_setup_main_process()`, so we just actually test the + # patched-in components themselves here. + # - `._setup_in_main_process()` doesn't include actually doing the + # preimports. To may the results more consistent between + # `start_method='dummy'` and the others, manually do them below. + cache._setup_in_main_process() # This calls `apply()` + assert cache.profiler is not None + assert cache.preimports_module is not None + run_path(str(cache.preimports_module), {'profile': cache.profiler}) + + func = _import_target(_SAFE_TARGET) + return_lines = _find_return_lines(_SAFE_TARGET) + Pool: Callable[..., multiprocessing.pool.Pool] + if start_method == 'dummy': + Pool = _import_target('multiprocessing.dummy.Pool') + # Twice the counted calls because we're also collecting the + # checking calls in this process + expected_ncalls = len(_SAFE_TARGET_ARGS) * 2 + get_stats: Callable[[], LineStats] = cache.profiler.get_stats + elif start_method not in START_METHODS: + pytest.skip( + f'`multiprocessing` start method {start_method!r} ' + 'not available on the platform' + ) + else: + Pool = multiprocessing.get_context(start_method).Pool + expected_ncalls = len(_SAFE_TARGET_ARGS) + get_stats = cache.gather_stats + + with Pool(2) as pool: + par_result = pool.starmap(func, _SAFE_TARGET_ARGS) + pool.close() + pool.join() + assert par_result == [func(*args) for args in _SAFE_TARGET_ARGS] + + # Check that calls in children are traced + line_entries = get_stats().timings[ + inspect.getfile(func), inspect.getsourcelines(func)[1], func.__name__, + ] + num_returns = sum( + nhits for lineno, nhits, _ in line_entries if lineno in return_lines + ) + assert num_returns == expected_ncalls + + # Check the debug logs to see if we have done everything right, esp. + # the logging interception part not covered by other tests + patterns: dict[str, bool] = { + 'Cleanup succeeded.*: .*dump_stats.*' + re.escape(path.name): True + for path in Path(cache.cache_dir).glob('*.lprof') + if is_valid_stats_file(path) + } + patterns[re.escape('`multiprocessing` logging (debug)')] = debug + _search_cache_logs(cache, True, patterns) + + +# XXX: End of tests for implementation details + +# ========================= Integration tests ========================== + + +def _get_mp_start_method_fuzzer(label_name: str) -> _Params: + """ + Returns: + :py:class:`_Params` object which does a full Cartesian-product + fuzz between ``fail`` (true or false) and ``start_method`` + ('fork', 'forkserver', and 'spawn'; default :py:const:`None`) + """ + fuzz_fail = _Params.new(('fail', label_name), + [(True, 'failure'), (False, 'success')], + defaults=(False, 'success')) + fuzz_start = _Params.new('start_method', ['fork', 'forkserver', 'spawn'], + defaults=None) + return fuzz_fail * fuzz_start + + +_fuzz_sanity = ( + _Params.new(('run_func', 'label1'), + [(run_module, 'module'), (run_script, 'script')]) + * _Params.new(('use_local_func', 'label2'), + [(True, 'local'), (False, 'ext')]) + # Python can't pickle things unless they resided in a retrievable + # location (so not the script supplied by `python -c`) + + _Params.new(('run_func', 'label1', 'use_local_func', 'label2'), + [(run_literal_code, 'literal-code', False, 'ext')]) + # Also fuzz the parallelization-related stuff, esp. check what + # happens if an exception is raised inside the parallelly-run func + + _get_mp_start_method_fuzzer('label3') + + _Params.new(('nnums', 'nprocs'), [(200, None), (None, 3)], + defaults=(None, None)) +) + + +@_fuzz_sanity +def test_multiproc_script_sanity_check( + run_func: Callable[..., subprocess.CompletedProcess], + test_module: _ModuleFixture, + tmp_path_factory: pytest.TempPathFactory, + use_local_func: bool, + fail: bool, + start_method: Literal['fork', 'forkserver', 'spawn'] | None, + nnums: int | None, + nprocs: int | None, + # Dummy arguments to make `pytest` output more legible + label1: str, label2: str, label3: str, +) -> None: + """ + Sanity check that the test module functions as expected when run + with vanilla Python. + """ + run_func( + test_module, tmp_path_factory, + runner=sys.executable, profile=False, + fail=fail, + use_local_func=use_local_func, + start_method=start_method, + nnums=nnums, nprocs=nprocs, + ) + + +@pytest.mark.parametrize( + ('run_func', 'label1'), + [(run_module, 'module'), + (run_script, 'script'), + (run_literal_code, 'literal-code')] +) +@pytest.mark.parametrize( + ('runner', 'outfile', 'profile', + 'label2'), # Dummy argument to make `pytest` output more legible + # This is essentially a no-op since it doesn't actually do + # line-profiling, but we check that code path for completeness + [(['kernprof', '-q', '--no-line'], 'out.prof', False, 'cProfile')] + # Run line profiling with and w/o profiling targets + + [(['kernprof', '-q', '-l'], 'out.lprof', False, + 'line_profiler-inactive'), + (['kernprof', '-q', '-l'], 'out.lprof', True, + 'line_profiler-active')], +) +def test_running_multiproc_script( + run_func: Callable[..., subprocess.CompletedProcess], + test_module: _ModuleFixture, + tmp_path_factory: pytest.TempPathFactory, + runner: str | list[str], + outfile: str | None, + profile: bool, + # Dummy arguments to make `pytest` output more legible + label1: str, label2: str, +) -> None: + """ + Check that `kernprof` can RUN the test module in various contexts + (`kernprof [...] `, `kernprof [...] -m `, and + `kernprof [...] -c "code"`). + + Notes: + - See issue #422 for the original motivation. + + - This test does not test the actual profiling, just the + execution of the code and presence of profiling data + thereafter. + """ + run_func(test_module, tmp_path_factory, runner, outfile, profile) + + +_fuzz_prof_mp_1 = ( + _Params.new(('run_func', 'label1'), + [(run_module, 'module'), + (run_script, 'script'), + (run_literal_code, 'literal-code')], + defaults=(run_script, 'script')) + + _Params.new(('prof_child_procs', 'label2'), + [(True, 'with-child-prof'), (False, 'no-child-prof')]) + + _get_mp_start_method_fuzzer('label3') +) +_fuzz_prof_mp_2 = ( + _Params.new(('preimports', 'label4'), + [(True, 'with-preimports'), (False, 'no-preimports')], + defaults=(False, 'no-preimports')) + + _Params.new(('use_local_func', 'label5'), + [(True, 'local'), (False, 'external')], + defaults=(False, 'external')) +) + + +@_fuzz_prof_mp_1 +@_fuzz_prof_mp_2 +@pytest.mark.parametrize( + # XXX: should we explicitly test the single-proc case? We already + # have quite a lot of subtests tho... + ('nnums', 'nprocs'), [(2000, 3)], +) +def test_profiling_multiproc_script( + run_func: Callable[..., subprocess.CompletedProcess], + test_module: _ModuleFixture, + ext_module: _ModuleFixture, + tmp_path_factory: pytest.TempPathFactory, + prof_child_procs: bool, + preimports: bool, + use_local_func: bool, + fail: bool, + start_method: Literal['fork', 'forkserver', 'spawn'] | None, + nnums: int, + nprocs: int, + # Dummy arguments to make `pytest` output more legible + label1: str, label2: str, label3: str, label4: str, label5: str, +) -> None: + """ + Check that `kernprof` can PROFILE the test module in various + contexts, optionally extending profiling into child processes. + + Note: + This test function is heavily parametrized. Here is why that is + necessary: + + - ``run_func`` tests the different :cmd:`kernprof` modes (see + :py:func:`~.test_running_multiproc_script`). + + - ``preimports`` tests that both mechanisms for setting up + profiling targets work: + + - :py:const:`True`: child processes import the module + generated by + :py:mod:`line_profiler.autoprofile.eager_preimports`, like + the main :py:mod:`kernprof` process does. + + - :py:const:`False`: child processes rewrite the executed code + before passing it to :py:mod:`runpy`, similar to what + :py:mod:`line_profiler.autoprofile.autoprofile` does. + + These code paths go through different + :py:mod:`multiprocessing` components that we have patched and + thus needs separate testing. + + - ``use_local_func`` tests that we can consistently set up + profiling in both functions locally-defined in the profiled + code and imported by it. + + - ``fail`` tests that our patches and hook doesn't choke when + exceptions occur in child processes, and profiling data can + still be collected. + + - ``start_method`` tests whether all available + :py:mod:`multiprocessing` start methods are covered. + + - ``prof_child_procs`` of course toggles whether to do the + patches to set up profiling in child processes. + """ + # XXX: owing to the shenanigans in + # `line_profiler._child_process_profiling.multiprocessing_patches`, + # there is a risk that failing child processes are not properly + # `.terminate()`-ed. So just put in a timeout... + timeout = 5 # Seconds + + # How many calls do we expect? + nhits = dict.fromkeys( + ['EXT-INVOCATION', 'EXT-LOOP', 'LOCAL-INVOCATION', 'LOCAL-LOOP'], 0, + ) + # Make sure we're profiling the right function + tag = 'LOCAL' if use_local_func else 'EXT' + tag_call = tag + '-INVOCATION' + tag_loop = tag + '-LOOP' + if not fail: + # The final sum in the parent process should always be profiled + # unless the child processes failed and we never returned from + # `Pool.starmap()` + nhits[tag_call] += 1 + nhits[tag_loop] += nprocs + if prof_child_procs: + # When profiling extends into child processes, each of them + # invokes the sum function once and when combined they loop thru + # all the items + nhits[tag_call] += nprocs + nhits[tag_loop] += nnums + + runner = ['kernprof', '-l'] + runner.extend([ + '--{}prof-child-procs'.format('' if prof_child_procs else 'no-'), + '--{}preimports'.format('' if preimports else 'no-'), + ]) + if not use_local_func: + # Also make sure to include the external module in `--prof-mod` + runner.append(f'--prof-mod={ext_module.name}') + run_func( + test_module, tmp_path_factory, + runner=runner, + outfile='out.lprof', + profile=True, + use_local_func=use_local_func, + fail=fail, + start_method=start_method, + nhits=nhits, + nnums=nnums, + nprocs=nprocs, + timeout=timeout, + debug_log=( + 'debug.log' if prof_child_procs and _DEBUG else None + ), + ) + + +@pytest.mark.parametrize(('use_subprocess', 'label1'), + [(True, 'subprocess.run'), (False, 'os.system')]) +@pytest.mark.parametrize(('prof_child_procs', 'label2'), + [(True, 'with-child-prof'), (False, 'no-child-prof')]) +@pytest.mark.parametrize(('fail', 'label3'), + [(True, 'failure'), (False, 'success')]) +@pytest.mark.parametrize('n', [200]) +def test_profiling_bare_python( + tmp_path_factory: pytest.TempPathFactory, + ext_module: _ModuleFixture, + use_subprocess: bool, + prof_child_procs: bool, + fail: bool, + n: int, + # Dummy arguments to make `pytest` output more legible + label1: str, label2: str, label3: str, +) -> None: + """ + Check that `kernprof` can profile the target functions if the code + invokes another bare Python process (via either :py:func:`os.system` + or :py:func:`subprocess.run`) that calls them. + """ + ext_module.install(children=True) + temp_dir = tmp_path_factory.mktemp('mytemp') + + script_path = temp_dir / 'my-script.py' + script_content = strip(""" + from {EXT_MODULE} import my_external_sum + + + if __name__ == '__main__': + numbers = list(range(1, 1 + {N})) + result = my_external_sum(numbers, {FAIL}) + """.format( + EXT_MODULE=ext_module.name, + N=n, + FAIL=fail, + )) + script_path.write_text(script_content) + + out_file = temp_dir / 'out.lprof' + debug_log_file = temp_dir / 'debug.log' + write_debug = _DEBUG and prof_child_procs + cmd = [ + 'kernprof', '-lv', '--preimports', + f'--prof-mod={ext_module.name}', + f'--outfile={out_file}', + '--{}prof-child-procs'.format('' if prof_child_procs else 'no-'), + ] + if write_debug: + cmd.append(f'--debug-log={debug_log_file}') + sub_cmd = [sys.executable, str(script_path)] + if use_subprocess: + code = strip(f""" + import subprocess + + + subprocess.run({sub_cmd!r}, check=True) + """) + else: + code = strip(""" + import os + + + if os.system({!r}): + raise RuntimeError('called process failed') + """.format(concat_command_line(sub_cmd))) + cmd.extend(['-c', code]) + proc = _run_subproc(cmd, text=True, capture_output=True) + + nhits = {'EXT-INVOCATION': 1, 'EXT-LOOP': n} + if not prof_child_procs: + for k in nhits: + nhits[k] = 0 + + try: + # Check that the code errors out when expected + assert bool(fail) == bool(proc.returncode) + # Check that the profiling output is as expected + for tag, num in nhits.items(): + _check_output(proc.stdout, tag, num) + finally: + if write_debug: + print('-- Combined debug logs --', file=sys.stderr) + print( + indent(debug_log_file.read_text(), ' '), + end='', file=sys.stderr, + ) + print('-- End of debug logs --', file=sys.stderr)