Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/144.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
If the value is a a bool, the function extract_data_from_json will attempt to iterate through the nested values
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If the value is a a bool, the function extract_data_from_json will attempt to iterate through the nested values
If the value is a boolean, the function extract_data_from_json will attempt to iterate through the nested values

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

63 changes: 48 additions & 15 deletions jdiff/extract_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import re
import warnings
from typing import Any, Dict, List, Mapping, Optional, Union
from collections.abc import Mapping
from typing import Any, Dict, List, Optional, Union

import jmespath

Expand All @@ -16,7 +17,11 @@
)


def extract_data_from_json(data: Union[Mapping, List], path: str = "*", exclude: Optional[List] = None) -> Any:
def extract_data_from_json(
data: Union[Mapping, List],
path: str = "*",
exclude: Optional[List] = None,
) -> Any:
"""Return wanted data from outpdevice data based on the check path. See unit test for complete example.

Get the wanted values to be evaluated if JMESPath expression is defined,
Expand All @@ -35,12 +40,16 @@ def extract_data_from_json(data: Union[Mapping, List], path: str = "*", exclude:
"""
if exclude and isinstance(data, (Dict, List)):
if not isinstance(exclude, list):
raise ValueError(f"Exclude list must be defined as a list. You have {type(exclude)}")
raise ValueError(
f"Exclude list must be defined as a list. You have {type(exclude)}",
)
# exclude unwanted elements
exclude_filter(data, exclude)

if not path:
warnings.warn("JMSPath cannot be empty string or type 'None'. Path argument reverted to default value '*'")
warnings.warn(
"JMSPath cannot be empty string or type 'None'. Path argument reverted to default value '*'",
)
path = "*"

if path == "*":
Expand All @@ -50,7 +59,10 @@ def extract_data_from_json(data: Union[Mapping, List], path: str = "*", exclude:
# Multi ref_key
if len(re.findall(r"\$.*?\$", path)) > 1:
clean_path = path.replace("$", "")
values = jmespath.search(f"{clean_path}{' | []' * (path.count('*') - 1)}", data)
values = jmespath.search(
f"{clean_path}{' | []' * (path.count('*') - 1)}",
data,
)
return keys_values_zipper(
multi_reference_keys(path, data),
associate_key_of_my_value(clean_path, values),
Expand All @@ -59,41 +71,62 @@ def extract_data_from_json(data: Union[Mapping, List], path: str = "*", exclude:
values = jmespath.search(jmespath_value_parser(path), data)

if values is None:
raise TypeError("JMSPath returned 'None'. Please, verify your JMSPath regex.")
raise TypeError(
"JMSPath returned 'None'. Please, verify your JMSPath regex.",
)

# check for multi-nested lists
if any(isinstance(i, list) for i in values):
if not isinstance(values, (str, int, float, bool)) and any(isinstance(i, list) for i in values):
# process elements to check if lists should be flattened
for element in values:
for item in element:
# raise if there is a dict, path must be more specific to extract data
if isinstance(item, dict):
raise TypeError(
f'Must be list of lists i.e. [["Idle", 75759616], ["Idle", 75759620]]. You have "{values}".'
f'Must be list of lists i.e. [["Idle", 75759616], ["Idle", 75759620]]. You have "{values}".',
)
if isinstance(item, list):
values = flatten_list(values) # flatten list and rewrite values
values = flatten_list(
values,
) # flatten list and rewrite values
break # items are the same, need to check only first to see if this is a nested list

# We need to get a list of reference keys - list of strings.
# Based on the expression or data we might have different data types
# therefore we need to normalize.
if re.search(r"\$.*\$", path):
paired_key_value = associate_key_of_my_value(jmespath_value_parser(path), values)
wanted_reference_keys = jmespath.search(jmespath_refkey_parser(path), data)
paired_key_value = associate_key_of_my_value(
jmespath_value_parser(path),
values,
)
wanted_reference_keys = jmespath.search(
jmespath_refkey_parser(path),
data,
)

if isinstance(wanted_reference_keys, dict): # when wanted_reference_keys is dict() type
if isinstance(
wanted_reference_keys,
dict,
): # when wanted_reference_keys is dict() type
list_of_reference_keys = list(wanted_reference_keys.keys())
elif any(
isinstance(element, list) for element in wanted_reference_keys
): # when wanted_reference_keys is a nested list
list_of_reference_keys = flatten_list(wanted_reference_keys)[0]
elif isinstance(wanted_reference_keys, list): # when wanted_reference_keys is a list
elif isinstance(
wanted_reference_keys,
list,
): # when wanted_reference_keys is a list
list_of_reference_keys = wanted_reference_keys
else:
raise ValueError("Reference Key normalization failure. Please verify data type returned.")
raise ValueError(
"Reference Key normalization failure. Please verify data type returned.",
)

normalized = keys_values_zipper(list_of_reference_keys, paired_key_value)
normalized = keys_values_zipper(
list_of_reference_keys,
paired_key_value,
)
# Data between pre and post may come in different order, so it needs to be sorted.
return sorted(normalized, key=lambda arg: list(arg.keys()))

Expand Down
17 changes: 16 additions & 1 deletion tests/test_get_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ def test_jmspath_return_none(jmspath):


@pytest.mark.parametrize(
"jmspath, expected_value", test_cases_extract_data_no_ref_key + test_cases_extract_data_with_ref_key
"jmspath, expected_value",
test_cases_extract_data_no_ref_key + test_cases_extract_data_with_ref_key,
)
def test_extract_data_from_json(jmspath, expected_value):
"""Test JMSPath return value."""
Expand Down Expand Up @@ -139,3 +140,17 @@ def test_top_key_anchor(jmspath, expected_value):
value = extract_data_from_json(data=data, path=jmspath)

assert value == expected_value, ASSERT_FAIL_MESSAGE.format(output=value, expected_output=expected_value)


def test_not_iterable_value():
"""Test JMSPath return value for anchoring the top key."""
data = {
"isBool": True,
}
path = "isBool"

value = extract_data_from_json(data=data, path=path)

expected_output = True

assert value == expected_output