From b0001b89eb798fbda0e40569e42c0dc579905cd1 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Mon, 10 Nov 2025 10:56:45 -0500 Subject: [PATCH] make properties from endpoint cache thread safe --- .../properties_from_endpoint.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py b/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py index 210e0bc4e..3ffa7889f 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py @@ -1,5 +1,5 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. - +import threading from dataclasses import InitVar, dataclass from typing import Any, Iterable, List, Mapping, Optional @@ -7,7 +7,7 @@ from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.retrievers import Retriever -from airbyte_cdk.sources.types import Config, StreamSlice +from airbyte_cdk.sources.types import Config @dataclass @@ -25,6 +25,7 @@ class PropertiesFromEndpoint: _cached_properties: Optional[List[str]] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._lock = threading.RLock() self._property_field_path = [ InterpolatedString(string=property_field, parameters=parameters) for property_field in self.property_field_path @@ -32,12 +33,14 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def get_properties_from_endpoint(self) -> List[str]: if self._cached_properties is None: - self._cached_properties = list( - map( - self._get_property, # type: ignore # SimpleRetriever and AsyncRetriever only returns Record. Should we change the return type of Retriever.read_records? - self.retriever.read_records(records_schema={}, stream_slice=None), - ) - ) + with self._lock: + if self._cached_properties is None: + self._cached_properties = list( + map( + self._get_property, # type: ignore # SimpleRetriever and AsyncRetriever only returns Record. Should we change the return type of Retriever.read_records? + self.retriever.read_records(records_schema={}, stream_slice=None), + ) + ) return self._cached_properties def _get_property(self, property_obj: Mapping[str, Any]) -> str: