diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index ee8d1e930a..4a5d224bb7 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -14,7 +14,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import hashlib import math +import threading from abc import ABC, abstractmethod from functools import singledispatch from typing import ( @@ -1975,11 +1977,119 @@ def residual_for(self, partition_data: Record) -> BooleanExpression: return self.expr +_DEFAULT_RESIDUAL_EVALUATOR_CACHE_SIZE = 128 + + +class ResidualEvaluatorCache: + """Thread-safe LRU cache for ResidualEvaluator instances. + + Caches ResidualEvaluators to avoid repeated instantiation and initialization + overhead when scanning multiple data files with identical partition specs, + expressions, schemas, and case sensitivity settings. + """ + + _cache: Dict[str, ResidualEvaluator] + _maxsize: int + _lock: threading.RLock + + def __init__(self, maxsize: int = _DEFAULT_RESIDUAL_EVALUATOR_CACHE_SIZE) -> None: + """Initialize the cache. + + Args: + maxsize: Maximum number of evaluators to cache. Defaults to 128. + """ + self._cache = {} + self._maxsize = maxsize + self._lock = threading.RLock() + + @staticmethod + def _make_key( + spec_id: int, + expr: BooleanExpression, + case_sensitive: bool, + schema_id: int | None = None, + ) -> str: + """Create deterministic cache key from evaluator parameters. + + Args: + spec_id: Partition spec identifier. + expr: Filter expression tree. + case_sensitive: Case-sensitive flag. + schema_id: Optional schema identifier. + + Returns: + 32-character MD5 hex string cache key. + """ + key_parts = f"{spec_id}#{repr(expr)}#{case_sensitive}#{schema_id}" + return hashlib.md5(key_parts.encode()).hexdigest() + + def get( + self, + spec: PartitionSpec, + expr: BooleanExpression, + case_sensitive: bool, + schema: Schema, + ) -> ResidualEvaluator | None: + """Retrieve cached evaluator if it exists. + + Args: + spec: Partition specification. + expr: Filter expression. + case_sensitive: Case sensitivity flag. + schema: Table schema. + + Returns: + Cached ResidualEvaluator or None. + """ + cache_key = self._make_key(spec.spec_id, expr, case_sensitive, schema.schema_id) + with self._lock: + return self._cache.get(cache_key) + + def put( + self, + spec: PartitionSpec, + expr: BooleanExpression, + case_sensitive: bool, + schema: Schema, + evaluator: ResidualEvaluator, + ) -> None: + """Cache a ResidualEvaluator instance. + + Args: + spec: Partition specification. + expr: Filter expression. + case_sensitive: Case sensitivity flag. + schema: Table schema. + evaluator: ResidualEvaluator to cache. + """ + cache_key = self._make_key(spec.spec_id, expr, case_sensitive, schema.schema_id) + with self._lock: + if len(self._cache) >= self._maxsize: + oldest_key = next(iter(self._cache)) + del self._cache[oldest_key] + self._cache[cache_key] = evaluator + + def clear(self) -> None: + """Clear all cached evaluators.""" + with self._lock: + self._cache.clear() + + +_residual_evaluator_cache = ResidualEvaluatorCache() + + def residual_evaluator_of( spec: PartitionSpec, expr: BooleanExpression, case_sensitive: bool, schema: Schema ) -> ResidualEvaluator: - return ( - UnpartitionedResidualEvaluator(schema=schema, expr=expr) - if spec.is_unpartitioned() - else ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive) - ) + cached = _residual_evaluator_cache.get(spec, expr, case_sensitive, schema) + if cached is not None: + return cached + + evaluator: ResidualEvaluator + if spec.is_unpartitioned(): + evaluator = UnpartitionedResidualEvaluator(schema=schema, expr=expr) + else: + evaluator = ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive) + + _residual_evaluator_cache.put(spec, expr, case_sensitive, schema, evaluator) + return evaluator