Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 74 additions & 19 deletions src/check_datapackage/extensions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import re
from collections.abc import Callable
from typing import Any, Self
from dataclasses import dataclass
from typing import Any, Self, cast

from jsonpath import JSONPath, compile
from jsonpath.segments import JSONPathRecursiveDescentSegment
from jsonpath.selectors import NameSelector
from pydantic import BaseModel, PrivateAttr, field_validator, model_validator

from check_datapackage.internals import (
Expand Down Expand Up @@ -91,6 +94,49 @@ def apply(self, properties: dict[str, Any]) -> list[Issue]:
)


@dataclass(frozen=True)
class TargetJsonPath:
"""A JSON path targeted by a `RequiredCheck`.

Attributes:
parent (str): The JSON path to the parent of the targeted field.
field (str): The name of the targeted field.
"""

parent: str
field: str


def _jsonpath_to_targets(jsonpath: JSONPath) -> list[TargetJsonPath]:
"""Create a list of `TargetJsonPath`s from a `JSONPath`."""
if not jsonpath.segments:
return []

full_path = jsonpath.segments[0].token.path
last_segment = jsonpath.segments[-1]
if isinstance(last_segment, JSONPathRecursiveDescentSegment):
raise ValueError(
f"Cannot use `RequiredCheck` for the JSON path `{full_path}`"
" because it ends in the recursive descent (`..`) operator."
)

selectors = last_segment.selectors
if _filter(selectors, lambda selector: not isinstance(selector, NameSelector)):
raise ValueError(
f"Cannot use `RequiredCheck` for the JSON path `{full_path}`"
" because it doesn't end in a name selector."
)

parent = "".join(_map(jsonpath.segments[:-1], lambda segment: str(segment)))
name_selectors = cast(tuple[NameSelector], selectors)
return _map(
name_selectors,
lambda selector: TargetJsonPath(
parent=str(compile(parent)), field=selector.name
),
)


class RequiredCheck(BaseModel, frozen=True):
"""Set a specific property as required.

Expand All @@ -112,22 +158,20 @@ class RequiredCheck(BaseModel, frozen=True):

jsonpath: JsonPath
message: str
_field_name: str = PrivateAttr()
_targets: list[TargetJsonPath] = PrivateAttr()

@model_validator(mode="after")
def _check_field_name_in_jsonpath(self) -> Self:
field_name_match = re.search(r"(?<!\.)(\.\w+)$", self.jsonpath)
if not field_name_match:
raise ValueError(
f"Cannot use `RequiredCheck` for this JSON path `{self.jsonpath}`."
" A `RequiredCheck` must use a JSON path that targets a specific and"
" real property (e.g., `$.title`) or set of properties (e.g.,"
" `$.resources[*].title`). Unspecific JSON paths (e.g., `$..title`)"
" or JSON paths pointing to array items (e.g., `$.resources[0]`) are"
" not allowed."
)

object.__setattr__(self, "_field_name", field_name_match.group(1))
jsonpath = compile(self.jsonpath)
if isinstance(jsonpath, JSONPath):
paths = [jsonpath]
else:
first_path = cast(JSONPath, jsonpath.path)
paths = [first_path] + _map(jsonpath.paths, lambda path: path[1])

object.__setattr__(
self, "_targets", _flat_map(paths, lambda path: _jsonpath_to_targets(path))
)
return self

def apply(self, properties: dict[str, Any]) -> list[Issue]:
Expand All @@ -140,16 +184,27 @@ def apply(self, properties: dict[str, Any]) -> list[Issue]:
A list of `Issue`s.
"""
matching_paths = _get_direct_jsonpaths(self.jsonpath, properties)
indirect_parent_path = self.jsonpath.removesuffix(self._field_name)
direct_parent_paths = _get_direct_jsonpaths(indirect_parent_path, properties)
return _flat_map(
self._targets,
lambda target: self._target_to_issues(target, matching_paths, properties),
)

def _target_to_issues(
self,
target: TargetJsonPath,
matching_paths: list[str],
properties: dict[str, Any],
) -> list[Issue]:
"""Create a list of `Issue`s from a `TargetJsonPath`."""
direct_parent_paths = _get_direct_jsonpaths(target.parent, properties)
missing_paths = _filter(
direct_parent_paths,
lambda path: f"{path}{self._field_name}" not in matching_paths,
lambda path: f"{path}.{target.field}" not in matching_paths,
)
return _map(
missing_paths,
lambda path: Issue(
jsonpath=path + self._field_name,
jsonpath=f"{path}.{target.field}",
type="required",
message=self.message,
),
Expand Down
53 changes: 51 additions & 2 deletions tests/test_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
example_resource_properties,
)
from check_datapackage.extensions import CustomCheck, Extensions, RequiredCheck
from check_datapackage.internals import _map
from check_datapackage.issue import Issue

lowercase_check = CustomCheck(
Expand Down Expand Up @@ -157,13 +158,61 @@ def test_required_check_array_wildcard():
]


def test_required_check_union():
properties = example_package_properties()
del properties["licenses"]
required_check = RequiredCheck(
jsonpath="$['licenses', 'sources'] | $.resources[*]['licenses', 'sources']",
message="Package and resources must have licenses and sources.",
)
config = Config(extensions=Extensions(required_checks=[required_check]))

issues = check(properties, config=config)

assert all(_map(issues, lambda issue: issue.type == "required"))
assert _map(issues, lambda issue: issue.jsonpath) == [
"$.licenses",
"$.resources[0].licenses",
"$.resources[0].sources",
"$.sources",
]


def test_required_check_non_final_recursive_descent():
properties = example_package_properties()
properties["resources"][0]["licenses"] = [{"name": "odc-pddl"}]
required_check = RequiredCheck(
jsonpath="$..licenses[*].title",
message="Licenses must have a title.",
)
config = Config(extensions=Extensions(required_checks=[required_check]))

issues = check(properties, config=config)

assert _map(issues, lambda issue: issue.jsonpath) == [
"$.licenses[0].title",
"$.resources[0].licenses[0].title",
]


def test_required_check_root():
properties = example_package_properties()
required_check = RequiredCheck(
jsonpath="$",
message="Package must have a root.",
)
config = Config(extensions=Extensions(required_checks=[required_check]))

issues = check(properties, config=config)

assert issues == []


@mark.parametrize(
"jsonpath",
[
"<><>bad.path",
"$",
"..*",
"created",
"$..path",
"..resources",
"$.resources[0].*",
Expand Down
Loading