Allow deeply nested dicts and lists in addon config schemas (#6171)

* Allow arbitrarily nested addon config schemas

* Disallow lists directly nested in another list in addon schema

* Handle arbitrarily nested addon schemas in UiOptions class

* Handle arbitrarily nested addon schemas in AddonOptions class

* Add tests for addon config schemas

* Add tests for addon option validation
This commit is contained in:
Lukas Waslowski 2025-09-16 11:32:28 +02:00 committed by GitHub
parent 2e22e1e884
commit ac9947d599
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 208 additions and 56 deletions

View File

@ -93,15 +93,7 @@ class AddonOptions(CoreSysAttributes):
typ = self.raw_schema[key]
try:
if isinstance(typ, list):
# nested value list
options[key] = self._nested_validate_list(typ[0], value, key)
elif isinstance(typ, dict):
# nested value dict
options[key] = self._nested_validate_dict(typ, value, key)
else:
# normal value
options[key] = self._single_validate(typ, value, key)
options[key] = self._validate_element(typ, value, key)
except (IndexError, KeyError):
raise vol.Invalid(
f"Type error for option '{key}' in {self._name} ({self._slug})"
@ -111,7 +103,20 @@ class AddonOptions(CoreSysAttributes):
return options
# pylint: disable=no-value-for-parameter
def _single_validate(self, typ: str, value: Any, key: str):
def _validate_element(self, typ: Any, value: Any, key: str) -> Any:
"""Validate a value against a type specification."""
if isinstance(typ, list):
# nested value list
return self._nested_validate_list(typ[0], value, key)
elif isinstance(typ, dict):
# nested value dict
return self._nested_validate_dict(typ, value, key)
else:
# normal value
return self._single_validate(typ, value, key)
# pylint: disable=no-value-for-parameter
def _single_validate(self, typ: str, value: Any, key: str) -> Any:
"""Validate a single element."""
# if required argument
if value is None:
@ -188,7 +193,9 @@ class AddonOptions(CoreSysAttributes):
f"Fatal error for option '{key}' with type '{typ}' in {self._name} ({self._slug})"
) from None
def _nested_validate_list(self, typ: Any, data_list: list[Any], key: str):
def _nested_validate_list(
self, typ: Any, data_list: list[Any], key: str
) -> list[Any]:
"""Validate nested items."""
options = []
@ -201,17 +208,13 @@ class AddonOptions(CoreSysAttributes):
# Process list
for element in data_list:
# Nested?
if isinstance(typ, dict):
c_options = self._nested_validate_dict(typ, element, key)
options.append(c_options)
else:
options.append(self._single_validate(typ, element, key))
options.append(self._validate_element(typ, element, key))
return options
def _nested_validate_dict(
self, typ: dict[Any, Any], data_dict: dict[Any, Any], key: str
):
) -> dict[Any, Any]:
"""Validate nested items."""
options = {}
@ -231,12 +234,7 @@ class AddonOptions(CoreSysAttributes):
continue
# Nested?
if isinstance(typ[c_key], list):
options[c_key] = self._nested_validate_list(
typ[c_key][0], c_value, c_key
)
else:
options[c_key] = self._single_validate(typ[c_key], c_value, c_key)
options[c_key] = self._validate_element(typ[c_key], c_value, c_key)
self._check_missing_options(typ, options, key)
return options
@ -274,18 +272,28 @@ class UiOptions(CoreSysAttributes):
# read options
for key, value in raw_schema.items():
if isinstance(value, list):
# nested value list
self._nested_ui_list(ui_schema, value, key)
elif isinstance(value, dict):
# nested value dict
self._nested_ui_dict(ui_schema, value, key)
else:
# normal value
self._single_ui_option(ui_schema, value, key)
self._ui_schema_element(ui_schema, value, key)
return ui_schema
def _ui_schema_element(
self,
ui_schema: list[dict[str, Any]],
value: str,
key: str,
multiple: bool = False,
):
if isinstance(value, list):
# nested value list
assert not multiple
self._nested_ui_list(ui_schema, value, key)
elif isinstance(value, dict):
# nested value dict
self._nested_ui_dict(ui_schema, value, key, multiple)
else:
# normal value
self._single_ui_option(ui_schema, value, key, multiple)
def _single_ui_option(
self,
ui_schema: list[dict[str, Any]],
@ -377,10 +385,7 @@ class UiOptions(CoreSysAttributes):
_LOGGER.error("Invalid schema %s", key)
return
if isinstance(element, dict):
self._nested_ui_dict(ui_schema, element, key, multiple=True)
else:
self._single_ui_option(ui_schema, element, key, multiple=True)
self._ui_schema_element(ui_schema, element, key, multiple=True)
def _nested_ui_dict(
self,
@ -399,11 +404,7 @@ class UiOptions(CoreSysAttributes):
nested_schema: list[dict[str, Any]] = []
for c_key, c_value in option_dict.items():
# Nested?
if isinstance(c_value, list):
self._nested_ui_list(nested_schema, c_value, c_key)
else:
self._single_ui_option(nested_schema, c_value, c_key)
self._ui_schema_element(nested_schema, c_value, c_key)
ui_node["schema"] = nested_schema
ui_schema.append(ui_node)

View File

@ -137,7 +137,19 @@ RE_DOCKER_IMAGE_BUILD = re.compile(
r"^([a-zA-Z\-\.:\d{}]+/)*?([\-\w{}]+)/([\-\w{}]+)(:[\.\-\w{}]+)?$"
)
SCHEMA_ELEMENT = vol.Match(RE_SCHEMA_ELEMENT)
SCHEMA_ELEMENT = vol.Schema(
vol.Any(
vol.Match(RE_SCHEMA_ELEMENT),
[
# A list may not directly contain another list
vol.Any(
vol.Match(RE_SCHEMA_ELEMENT),
{str: vol.Self},
)
],
{str: vol.Self},
)
)
RE_MACHINE = re.compile(
r"^!?(?:"
@ -406,20 +418,7 @@ _SCHEMA_ADDON_CONFIG = vol.Schema(
vol.Optional(ATTR_CODENOTARY): vol.Email(),
vol.Optional(ATTR_OPTIONS, default={}): dict,
vol.Optional(ATTR_SCHEMA, default={}): vol.Any(
vol.Schema(
{
str: vol.Any(
SCHEMA_ELEMENT,
[
vol.Any(
SCHEMA_ELEMENT,
{str: vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])},
)
],
vol.Schema({str: vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])}),
)
}
),
vol.Schema({str: SCHEMA_ELEMENT}),
False,
),
vol.Optional(ATTR_IMAGE): docker_image,

View File

@ -325,3 +325,97 @@ def test_valid_slug():
config["slug"] = "complemento telefónico"
with pytest.raises(vol.Invalid):
assert vd.SCHEMA_ADDON_CONFIG(config)
def test_valid_schema():
"""Test valid and invalid addon slugs."""
config = load_json_fixture("basic-addon-config.json")
# Basic types
config["schema"] = {
"bool_basic": "bool",
"mail_basic": "email",
"url_basic": "url",
"port_basic": "port",
"match_basic": "match(.*@.*)",
"list_basic": "list(option1|option2|option3)",
# device
"device_basic": "device",
"device_filter": "device(subsystem=tty)",
# str
"str_basic": "str",
"str_basic2": "str(,)",
"str_min": "str(5,)",
"str_max": "str(,10)",
"str_minmax": "str(5,10)",
# password
"password_basic": "password",
"password_basic2": "password(,)",
"password_min": "password(5,)",
"password_max": "password(,10)",
"password_minmax": "password(5,10)",
# int
"int_basic": "int",
"int_basic2": "int(,)",
"int_min": "int(5,)",
"int_max": "int(,10)",
"int_minmax": "int(5,10)",
# float
"float_basic": "float",
"float_basic2": "float(,)",
"float_min": "float(5,)",
"float_max": "float(,10)",
"float_minmax": "float(5,10)",
}
assert vd.SCHEMA_ADDON_CONFIG(config)
# Different valid ways of nesting dicts and lists
config["schema"] = {
"str_list": ["str"],
"dict_in_list": [
{
"required": "str",
"optional": "str?",
}
],
"dict": {
"required": "str",
"optional": "str?",
"str_list_in_dict": ["str"],
"dict_in_list_in_dict": [
{
"required": "str",
"optional": "str?",
"str_list_in_dict_in_list_in_dict": ["str"],
}
],
"dict_in_dict": {
"str_list_in_dict_in_dict": ["str"],
"dict_in_list_in_dict_in_dict": [
{
"required": "str",
"optional": "str?",
}
],
"dict_in_dict_in_dict": {
"required": "str",
"optional": "str",
},
},
},
}
assert vd.SCHEMA_ADDON_CONFIG(config)
# List nested within dict within list
config["schema"] = {"field": [{"subfield": ["str"]}]}
assert vd.SCHEMA_ADDON_CONFIG(config)
# No lists directly nested within each other
config["schema"] = {"field": [["str"]]}
with pytest.raises(vol.Invalid):
assert vd.SCHEMA_ADDON_CONFIG(config)
# Field types must be valid
config["schema"] = {"field": "invalid"}
with pytest.raises(vol.Invalid):
assert vd.SCHEMA_ADDON_CONFIG(config)

View File

@ -129,6 +129,64 @@ def test_complex_schema_dict(coresys):
)({"name": "Pascal", "password": "1234", "extend": "test"})
def test_complex_schema_dict_and_list(coresys):
"""Test with complex dict/list nested schema."""
assert AddonOptions(
coresys,
{
"name": "str",
"packages": [
{
"name": "str",
"options": {"optional": "bool"},
"dependencies": [{"name": "str"}],
}
],
},
MOCK_ADDON_NAME,
MOCK_ADDON_SLUG,
)(
{
"name": "Pascal",
"packages": [
{
"name": "core",
"options": {"optional": False},
"dependencies": [{"name": "supervisor"}, {"name": "audio"}],
}
],
}
)
with pytest.raises(vol.error.Invalid):
assert AddonOptions(
coresys,
{
"name": "str",
"packages": [
{
"name": "str",
"options": {"optional": "bool"},
"dependencies": [{"name": "str"}],
}
],
},
MOCK_ADDON_NAME,
MOCK_ADDON_SLUG,
)(
{
"name": "Pascal",
"packages": [
{
"name": "core",
"options": {"optional": False},
"dependencies": [{"name": "supervisor"}, "wrong"],
}
],
}
)
def test_simple_device_schema(coresys):
"""Test with simple schema."""
for device in (