Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions esphome_device_builder/controllers/automations/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,39 @@ def singleton_component_id(section: dict, domain: str) -> str:
return str(section.get("id") or domain)


def resolve_component_domain(yaml_text: str, component_id: str) -> str | None:
"""
Return the top-level domain whose instance declares *component_id*.

Keys instances exactly as :func:`_parse_inline_component_triggers`
does — list instances on ``id`` (or the synthetic ``<domain>_<idx>``),
flat singletons on :func:`singleton_component_id` — so the writer
attributes an id to the same domain the parser did. Only declared
instance ids match; an action *reference* (``id:`` nested in a
handler body) never does. ``None`` when no instance matches or the
YAML can't be parsed.
"""
yaml = make_yaml()
try:
root = yaml.load(yaml_text)
except Exception: # noqa: BLE001 — any load failure falls back to the catalog guess
return None
Comment thread
bdraco marked this conversation as resolved.
if not isinstance(root, dict):
return None
for domain, section in root.items():
if domain not in _component_trigger_domains():
continue
if isinstance(section, list):
for idx, instance in enumerate(section):
if not isinstance(instance, dict):
continue
if str(instance.get("id") or f"{domain}_{idx}") == component_id:
return str(domain)
elif isinstance(section, dict) and singleton_component_id(section, domain) == component_id:
return str(domain)
return None


def _parse_inline_component_triggers(root: Any) -> list[ParsedAutomation]:
"""
Walk component instances for inline ``on_*:`` handlers.
Expand Down
52 changes: 11 additions & 41 deletions esphome_device_builder/controllers/automations/writing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@

from __future__ import annotations

import re

from ...helpers.api import CommandError
from ...helpers.yaml import (
_splice_into_domain_block,
remove_inline_handler,
synthetic_instance_index,
upsert_inline_handler,
)
from ...models.api import ErrorCode
Expand All @@ -43,7 +40,7 @@
render_script_item,
render_trigger_handler,
)
from .parsing import make_yaml
from .parsing import make_yaml, resolve_component_domain
from .writing_lists import (
delete_light_effect,
delete_list_entry,
Expand Down Expand Up @@ -580,44 +577,17 @@ def _component_domain_from_yaml(
the writer then fails with "instance id='relay' not found
under 'fan'".

Walk the YAML and find the top-level key whose subtree contains
``id: <component_id>``. That's the domain the user actually
configured. Falls back to the catalog guess when the id can't
be located in the YAML (which also means the upsert won't find
a splice destination — the user will see a clearer
"id not found" error from ``upsert_inline_handler``).
Resolve structurally against the parsed config — id-less and flat
singletons included — so only a declared instance id matches, never
an action *reference* to that id nested in another component's
handler. Falls back to the catalog guess when the id can't be
located (which also means the upsert won't find a splice
destination — the user gets a clearer "id not found" error from
``upsert_inline_handler``).
"""
target_id = location.component_id
id_re = re.compile(
r"^\s+(?:-\s+)?id:\s*[\"']?(\S+?)[\"']?\s*(?:#.*)?$",
)
top_re = re.compile(r"^([a-zA-Z_][\w]*)\s*:")
current_domain: str | None = None
top_level_domains: list[str] = []
for line in yaml_text.splitlines():
if line and not line[0].isspace():
m = top_re.match(line)
current_domain = m.group(1) if m else None
if current_domain is not None:
top_level_domains.append(current_domain)
continue
if current_domain is None:
continue
m = id_re.match(line)
if m and m.group(1) == target_id:
return current_domain
# No literal ``id:`` match — the parser labels id-less instances
# ``<domain>_<idx>``, so recover the domain from that prefix before
# falling back to the ambiguous catalog guess.
for domain in top_level_domains:
if synthetic_instance_index(domain, target_id) is not None:
return domain
# An id-less flat singleton (``sun:`` / ``mqtt:``) is keyed by the
# domain name itself; recover it before the catalog guess, which
# mis-attributes a shared trigger key (``mqtt.on_connect`` vs
# ``wifi.on_connect``).
if target_id in top_level_domains:
return target_id
domain = resolve_component_domain(yaml_text, location.component_id)
if domain is not None:
return domain
return _component_domain(location)


Expand Down
67 changes: 66 additions & 1 deletion tests/test_automations_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import orjson
import pytest

from esphome_device_builder.controllers.automations.parsing import parse_device_yaml
from esphome_device_builder.controllers.automations.parsing import (
parse_device_yaml,
resolve_component_domain,
)
from esphome_device_builder.helpers.api import CommandError

_FIXTURES = Path(__file__).parent / "fixtures" / "automation_yamls"
Expand Down Expand Up @@ -543,3 +546,65 @@ def test_parsed_entries_carry_valid_line_ranges() -> None:
for item in parsed:
assert item.from_line >= 1
assert item.to_line >= item.from_line


# ---------------------------------------------------------------------------
# resolve_component_domain
# ---------------------------------------------------------------------------


def test_resolve_domain_matches_declared_list_instance_id() -> None:
"""A declared list-instance id resolves to its top-level domain."""
text = "switch:\n - platform: gpio\n id: relay\n pin: GPIO5\n"
assert resolve_component_domain(text, "relay") == "switch"


def test_resolve_domain_matches_idless_synthetic_index() -> None:
"""An id-less list instance resolves via its synthetic ``<domain>_<idx>`` key."""
text = "switch:\n - platform: gpio\n pin: GPIO5\n"
assert resolve_component_domain(text, "switch_0") == "switch"


def test_resolve_domain_matches_flat_singleton_on_domain() -> None:
"""An id-less flat singleton resolves on the domain name itself."""
text = "sun:\n latitude: 1.0\n longitude: 2.0\n"
assert resolve_component_domain(text, "sun") == "sun"


def test_resolve_domain_ignores_nested_action_reference() -> None:
"""An ``id:`` reference nested in an action body never owns the domain."""
text = (
"light:\n"
" - platform: binary\n"
" id: lamp\n"
" on_turn_on:\n"
" then:\n"
" - switch.turn_off:\n"
" id: relay\n"
"switch:\n"
" - platform: gpio\n"
" id: relay\n"
)
assert resolve_component_domain(text, "relay") == "switch"


def test_resolve_domain_returns_none_for_unknown_id() -> None:
"""A component id that no instance declares resolves to ``None``."""
text = "switch:\n - platform: gpio\n id: relay\n"
assert resolve_component_domain(text, "missing") is None


def test_resolve_domain_returns_none_for_non_dict_root() -> None:
"""A document whose root isn't a mapping resolves to ``None``."""
assert resolve_component_domain("- a\n- b\n", "relay") is None


def test_resolve_domain_returns_none_for_unparseable_yaml() -> None:
"""Malformed YAML falls back to ``None`` instead of raising."""
assert resolve_component_domain(":\n - [\n", "relay") is None


def test_resolve_domain_skips_non_dict_list_entry() -> None:
"""A non-mapping entry inside a domain list is skipped, not matched."""
text = "switch:\n - just-a-string\n - platform: gpio\n id: relay\n"
assert resolve_component_domain(text, "relay") == "switch"
38 changes: 38 additions & 0 deletions tests/test_automations_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,44 @@ def test_upsert_component_on_resolves_domain_from_yaml_when_trigger_key_is_ambig
assert "fan:" not in new_text


def test_upsert_component_on_ignores_action_reference_to_target_id() -> None:
"""An earlier handler that *references* the id must not steal the domain.

A shared trigger key (``on_turn_on``) plus a decoy ``id: relay``
reference nested in an earlier component's action body used to make
the writer attribute ``relay`` to the decoy's domain (``light``)
and fail with "instance id='relay' not found under 'light'". Only
the declared ``switch`` instance owns the id.
"""
text = (
"light:\n"
" - platform: binary\n"
" id: lamp\n"
" output: out\n"
" on_turn_on:\n"
" then:\n"
" - switch.turn_off:\n"
" id: relay\n"
"switch:\n"
" - platform: gpio\n"
" id: relay\n"
" pin: GPIO5\n"
)
new_text, _diff = render_upsert(
text,
tree=AutomationTree(
trigger_id="switch.on_turn_on",
actions=[ActionNode(action_id="delay", params={"seconds": "1"})],
),
location=ComponentOnLocation(component_id="relay", trigger="on_turn_on"),
)
# The new handler landed under the switch instance, beside its
# existing pin — the light block's decoy reference is untouched.
switch_block = new_text.split("switch:", 1)[1]
assert "on_turn_on:" in switch_block
assert "pin: GPIO5" in switch_block


def test_delete_component_on_missing_instance_raises_not_found() -> None:
"""Deleting on_press on an instance id that doesn't exist raises NOT_FOUND."""
text = "binary_sensor:\n - platform: gpio\n id: btn\n pin: GPIO0\n"
Expand Down
Loading