Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 33 additions & 19 deletions libs/openant-core/core/parser_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def parse_repository(
skip_tests: bool = True,
name: str = None,
diff_manifest: str | None = None,
fresh: bool = False,
library_mode: bool = False,
) -> ParseResult:
"""Parse a repository into an OpenAnt dataset.

Expand All @@ -93,9 +93,6 @@ def parse_repository(
processing_level: "all", "reachable", "codeql", or "exploitable".
skip_tests: If True, exclude test files from parsing (default: True).
name: Dataset name override (default: derived from repo path basename).
fresh: If True, delete existing dataset.json before parsing so all
units are regenerated from scratch. Only dataset.json is deleted;
other artifacts in output_dir (e.g. analyzer outputs) are preserved.

Returns:
ParseResult with paths to generated files and stats.
Expand All @@ -108,26 +105,14 @@ def parse_repository(
output_dir = os.path.abspath(output_dir)
os.makedirs(output_dir, exist_ok=True)

if fresh:
dataset_path = os.path.join(output_dir, "dataset.json")
# Use try/except instead of exists()+remove() to avoid a TOCTOU race
# if a concurrent --fresh run removes the file between the two calls.
# Only dataset.json is deleted; other artifacts (analyzer outputs, etc.)
# in output_dir are preserved.
try:
os.remove(dataset_path)
print("[Parser] --fresh: deleted existing dataset.json", file=sys.stderr)
except FileNotFoundError:
pass

# Detect language if auto
if language == "auto":
language = detect_language(repo_path)
print(f" Auto-detected language: {language}", file=sys.stderr)

# Dispatch to the right parser
if language == "python":
result = _parse_python(repo_path, output_dir, processing_level, skip_tests, name)
result = _parse_python(repo_path, output_dir, processing_level, skip_tests, name, library_mode)
elif language == "javascript":
result = _parse_javascript(repo_path, output_dir, processing_level, skip_tests, name)
elif language == "go":
Expand Down Expand Up @@ -207,11 +192,34 @@ def _maybe_apply_diff_filter(
# Reachability filter (shared by Python path; JS/Go handle it internally)
# ---------------------------------------------------------------------------

def _library_seed_ids(functions: dict) -> "set[str]":
"""Public-API seed set for library-mode reachability.

A pure library exposes no main/route/CLI entry point, so the structural
detector finds nothing and the whole library is filtered out (0 reachable).
In library-mode the *public surface* IS the entry surface: seed every
exported/public function and let the forward BFS pull in its callees.

Public = exported AND not name-private. ``is_exported`` is honoured when the
parser provides it (C/Go/JS — excludes ``static``/unexported); for parsers
without the field (python/ruby/php) it defaults True and the name heuristic
(leading underscore = private) decides. The bias is intentionally toward
over-seeding (more reachable = more analysed), never under-seeding.
"""
seeds: set[str] = set()
for func_id, fd in functions.items():
name = (fd.get("name") or func_id.rsplit(":", 1)[-1]).split(".")[-1]
if fd.get("is_exported", True) and not name.startswith("_"):
seeds.add(func_id)
return seeds


def apply_reachability_filter(
dataset: dict,
output_dir: str,
processing_level: str,
extra_entry_points: "set[str] | None" = None,
library_mode: bool = False,
) -> dict:
"""Filter dataset units to only those reachable from entry points.

Expand Down Expand Up @@ -277,6 +285,11 @@ def _load_module(name, filename):
entry_points = detector.detect_entry_points()
if extra_entry_points:
entry_points = entry_points | extra_entry_points
# Library-mode (opt-in): the public API is the entry surface. Union-only —
# never demotes a structurally-detected app entry point, so an app scan with
# the flag on can only gain reachable units, never lose one.
if library_mode:
entry_points = entry_points | _library_seed_ids(functions)

units = dataset.get("units", [])
original_count = len(units)
Expand Down Expand Up @@ -374,7 +387,7 @@ def _load_module(name, filename):
# Python parser
# ---------------------------------------------------------------------------

def _parse_python(repo_path: str, output_dir: str, processing_level: str, skip_tests: bool = True, name: str = None) -> ParseResult:
def _parse_python(repo_path: str, output_dir: str, processing_level: str, skip_tests: bool = True, name: str = None, library_mode: bool = False) -> ParseResult:
"""Invoke the Python parser.

The Python parser has a clean `parse_repository()` function that we can
Expand Down Expand Up @@ -402,7 +415,8 @@ def _parse_python(repo_path: str, output_dir: str, processing_level: str, skip_t

# Apply reachability filter if processing_level requires it
if processing_level != "all":
dataset = _apply_reachability_filter(dataset, output_dir, processing_level)
dataset = _apply_reachability_filter(dataset, output_dir, processing_level,
library_mode=library_mode)

# Write outputs
write_json(dataset_path, dataset)
Expand Down
122 changes: 122 additions & 0 deletions libs/openant-core/tests/test_library_mode_reachability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Library-mode reachability seeding (BUG-005).

A pure library exposes no main/route/CLI entry point, so the structural detector
finds nothing and `apply_reachability_filter` drops EVERY unit — the library
(including any vulnerable sink it contains) is never analysed. Library-mode seeds
the public API surface so the forward BFS pulls in the rest.

These tests pin: (1) the mode-OFF baseline, (2) the public API becomes
reachable when ON (and its private callee comes along via the call edge), (3) a
truly-unreferenced private function stays out, and — adversarially — (4) turning
the mode ON for an APP can only ADD reachable units, never remove one (union-only
seed merge), so existing app scans are never degraded.

NOTE: stacked on PR #75. On master a no-entry-point library blacks out (0 units),
which is the bug this PR fixes. PR #75's zero-seed fallback already prevents that
blackout — bluntly — by returning ALL units unfiltered when no entry point is
detected. So the mode-OFF baseline here is "all units unfiltered" (#75), and
library-mode ON refines it to the precise public-API-reachable subset.
"""

import json
import sys
from pathlib import Path

_CORE_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_CORE_ROOT))

from core.parser_adapter import apply_reachability_filter


def _run(tmp_path, functions, call_graph, *, library_mode, entry_types=None):
"""Write a call_graph.json + dataset and run the filter; return kept unit ids."""
entry_types = entry_types or {}
reverse = {}
for caller, callees in call_graph.items():
for callee in callees:
reverse.setdefault(callee, []).append(caller)
# functions carry name (+ optional unit_type to trip the structural detector)
fns = {fid: {"name": fid.split(":")[-1].split(".")[-1],
"unit_type": entry_types.get(fid, "function")} for fid in functions}
(tmp_path / "call_graph.json").write_text(json.dumps(
{"functions": fns, "call_graph": call_graph, "reverse_call_graph": reverse}))
dataset = {"units": [{"id": fid, "unit_type": entry_types.get(fid, "function")}
for fid in functions]}
out = apply_reachability_filter(dataset, str(tmp_path), "reachable",
library_mode=library_mode)
return {u["id"] for u in out["units"]}


# library: public_api() -> _sink() (no structural entry point)
_LIB_FNS = ["lib.py:public_api", "lib.py:_sink"]
_LIB_CG = {"lib.py:public_api": ["lib.py:_sink"]}


def test_library_mode_off_returns_all_unfiltered(tmp_path):
"""Mode off (stacked on #75): a no-entry-point library is NOT blacked out —
#75's zero-seed fallback returns all units unfiltered. Library-mode ON refines
this to the public-API-reachable subset (see precision test below)."""
kept = _run(tmp_path, _LIB_FNS, _LIB_CG, library_mode=False)
assert kept == set(_LIB_FNS), f"expected #75 all-unfiltered fallback, got {kept}"


def test_library_public_api_reachable_when_mode_on(tmp_path):
"""Mode on: the public API is seeded, and its private callee comes along."""
kept = _run(tmp_path, _LIB_FNS, _LIB_CG, library_mode=True)
assert "lib.py:public_api" in kept, f"public API not seeded: {kept}"
assert "lib.py:_sink" in kept, f"private callee of the public API not reached: {kept}"


def test_unreferenced_private_stays_out(tmp_path):
"""Precision: a private function nothing calls is NOT seeded (only the public
surface is) — so library-mode doesn't blanket-seed every unit."""
fns = _LIB_FNS + ["lib.py:_orphan"]
kept = _run(tmp_path, fns, _LIB_CG, library_mode=True)
assert "lib.py:_orphan" not in kept, f"unreferenced private wrongly seeded: {kept}"


# app: main() is a route_handler entry; helper() is its callee; _dead() is unreferenced
_APP_FNS = ["app.py:main", "app.py:helper", "app.py:_dead"]
_APP_CG = {"app.py:main": ["app.py:helper"]}
_APP_ENTRY = {"app.py:main": "route_handler"}


def test_app_baseline_mode_off(tmp_path):
"""App with a real entry point: normal reachable set when mode off."""
kept = _run(tmp_path, _APP_FNS, _APP_CG, library_mode=False, entry_types=_APP_ENTRY)
assert kept == {"app.py:main", "app.py:helper"}, f"app baseline changed: {kept}"


def test_app_mode_on_is_additive_only(tmp_path):
"""Adversarial: turning library-mode ON for an app can only ADD reachable units
(union-only seed merge) — it must never drop one the app scan already had."""
off = _run(tmp_path, _APP_FNS, _APP_CG, library_mode=False, entry_types=_APP_ENTRY)
on = _run(tmp_path, _APP_FNS, _APP_CG, library_mode=True, entry_types=_APP_ENTRY)
assert off <= on, f"library-mode REMOVED app units: off={off} on={on}"
assert off == {"app.py:main", "app.py:helper"}


def test_parse_repository_wiring(tmp_path):
"""Integration guard: library_mode must flow parse_repository -> _parse_python ->
apply_reachability_filter. (A unit test on the filter alone missed a wiring bug
where `_parse_python` referenced library_mode before it was threaded.)"""
from core.parser_adapter import parse_repository
repo = tmp_path / "repo"; repo.mkdir()
(repo / "lib.py").write_text(
"def public_api(x):\n return _sink(x)\n\ndef _sink(x):\n return eval(x)\n")
import json as _json

def _kept(library_mode):
out = tmp_path / f"out_{library_mode}"; out.mkdir()
parse_repository(repo_path=str(repo), output_dir=str(out), language="python",
processing_level="reachable", library_mode=library_mode)
ds = _json.loads((out / "dataset.json").read_text())
return {u.get("id") for u in ds.get("units", [])}

# Stacked on #75: mode off returns all units unfiltered (zero-seed fallback),
# not a blackout. Mode on refines to the public-API-reachable subset.
assert _kept(False) == {"lib.py:public_api", "lib.py:_sink"}, \
"mode off: expected #75 all-unfiltered fallback"
on = _kept(True)
assert any(i.endswith(":public_api") for i in on), f"public api not analysed: {on}"
assert any(i.endswith(":_sink") for i in on), f"eval sink not analysed: {on}"
Loading