diff --git a/tests/behavior/test_configurable_steering_behavior.py b/tests/behavior/test_configurable_steering_behavior.py index 9e130b61b..baec16e34 100644 --- a/tests/behavior/test_configurable_steering_behavior.py +++ b/tests/behavior/test_configurable_steering_behavior.py @@ -1,983 +1,992 @@ -""" -Behavior specification tests for Configurable Steering Handler. - -These tests follow BDD principles to specify the expected behavior of the configurable -steering system as defined in the architecture and requirements. They use Given-When-Then -structure to clearly specify behavior requirements rather than just validating -implementation details. - -Key behaviors specified: -1. Rule matching logic (tool name and phrase matching) -2. Rate limiting enforcement across sessions -3. Priority-based rule selection -4. Configuration validation and error handling -5. Concurrent request handling and thread safety -6. Security boundary enforcement -""" - -import asyncio - -import pytest -from src.core.interfaces.tool_call_reactor_interface import ToolCallContext -from src.core.services.tool_call_handlers.config_steering_handler import ( - ConfigSteeringHandler, -) - - -class TestRuleMatchingBehavior: - """ - Behavior specifications for rule matching logic as defined in requirements. - - Given: A set of configured steering rules - When: A tool call is processed - Then: The appropriate rule should be matched based on tool names and phrases - """ - - def test_exact_tool_name_matching(self): - """ - Given: Rules with specific tool name triggers - When: A tool call matches an exact tool name - Then: The corresponding rule should be matched - """ - # Given - rules = [ - { - "name": "pytest_rule", - "enabled": True, - "message": "Use pytest with compression enabled", - "triggers": {"tool_names": ["pytest_execute_tool"], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 50, - } - ] - handler = ConfigSteeringHandler(rules) - - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="pytest_execute_tool", - tool_arguments={"command": "pytest tests/"}, - ) - - # When - can_handle = asyncio.run(handler.can_handle(context)) - - # Then - assert can_handle is True - - def test_phrase_matching_case_insensitive(self): - """ - Given: Rules with phrase triggers - When: A tool call contains matching phrases (case-insensitive) - Then: The corresponding rule should be matched - """ - # Given - rules = [ - { - "name": "git_dangerous_rule", - "enabled": True, - "message": "Dangerous git commands are blocked for safety", - "triggers": { - "tool_names": [], - "phrases": ["git clean", "git reset --hard"], - }, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 100, - } - ] - handler = ConfigSteeringHandler(rules) - - # Test case-insensitive matching in tool name - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="execute_command", - tool_arguments={"command": "GIT CLEAN -fd"}, - ) - - # When - can_handle = asyncio.run(handler.can_handle(context)) - - # Then - assert can_handle is True - - def test_phrase_matching_in_arguments(self): - """ - Given: Rules with phrase triggers - When: A tool call's arguments contain matching phrases - Then: The corresponding rule should be matched - """ - # Given - rules = [ - { - "name": "rm_dangerous_rule", - "enabled": True, - "message": "Recursive deletion commands are blocked", - "triggers": {"tool_names": [], "phrases": ["rm -rf", "rmdir /s"]}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 100, - } - ] - handler = ConfigSteeringHandler(rules) - - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="execute_shell_command", - tool_arguments={"cmd": "rm -rf /tmp/test"}, - ) - - # When - can_handle = asyncio.run(handler.can_handle(context)) - - # Then - assert can_handle is True - - def test_priority_based_rule_selection(self): - """ - Given: Multiple rules that could match the same tool call - When: A tool call is processed - Then: The rule with highest priority should be selected - """ - # Given - rules = [ - { - "name": "low_priority_rule", - "enabled": True, - "message": "Low priority message", - "triggers": {"tool_names": ["pytest_execute_tool"], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 10, - }, - { - "name": "high_priority_rule", - "enabled": True, - "message": "High priority message", - "triggers": {"tool_names": ["pytest_execute_tool"], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 90, - }, - ] - handler = ConfigSteeringHandler(rules) - - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="pytest_execute_tool", - tool_arguments={"command": "pytest tests/"}, - ) - - # When - result = asyncio.run(handler.handle(context)) - - # Then - assert result.replacement_response == "High priority message" - assert result.metadata["rule"] == "high_priority_rule" - - def test_disabled_rules_are_ignored(self): - """ - Given: Rules with enabled/disabled status - When: A tool call matches both enabled and disabled rules - Then: Only enabled rules should be considered - """ - # Given - rules = [ - { - "name": "disabled_rule", - "enabled": False, - "message": "This should be ignored", - "triggers": {"tool_names": ["pytest_execute_tool"], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 100, - }, - { - "name": "enabled_rule", - "enabled": True, - "message": "This should be used", - "triggers": {"tool_names": ["pytest_execute_tool"], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 50, - }, - ] - handler = ConfigSteeringHandler(rules) - - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="pytest_execute_tool", - tool_arguments={"command": "pytest tests/"}, - ) - - # When - result = asyncio.run(handler.handle(context)) - - # Then - assert result.replacement_response == "This should be used" - assert result.metadata["rule"] == "enabled_rule" - - def test_no_matching_rule_returns_no_swallow(self): - """ - Given: A set of configured rules - When: A tool call doesn't match any rules - Then: No action should be taken (should_swallow=False) - """ - # Given - rules = [ - { - "name": "git_rule", - "enabled": True, - "message": "Git message", - "triggers": {"tool_names": ["git_command"], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 50, - } - ] - handler = ConfigSteeringHandler(rules) - - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="python_execute", - tool_arguments={"code": "print('hello')"}, - ) - - # When - result = asyncio.run(handler.handle(context)) - - # Then - assert result.should_swallow is False - assert result.replacement_response is None - - -class TestRateLimitingBehavior: - """ - Behavior specifications for rate limiting as defined in security requirements. - - Given: A rule with rate limiting configuration - When: Multiple tool calls are made within the rate limit window - Then: Only the allowed number should be handled within the window - """ - - def test_rate_limit_enforcement_per_session(self): - """ - Given: A rule with rate limit of 2 calls per 60 seconds - When: 3 tool calls are made within the same session - Then: Only the first 2 should be handled - """ - # Given - rules = [ - { - "name": "rate_limited_rule", - "enabled": True, - "message": "Rate limited message", - "triggers": {"tool_names": ["test_tool"], "phrases": []}, - "rate_limit": {"calls_per_window": 2, "window_seconds": 60}, - "priority": 50, - } - ] - handler = ConfigSteeringHandler(rules) - - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="test_tool", - tool_arguments={}, - ) - - # When - Make 3 calls - results = [] - for _i in range(3): - result = asyncio.run(handler.handle(context)) - results.append(result) - - # Then - Note: Current implementation appears to have rate limiting issues - # All calls are currently being handled, but rate limiting should eventually work - assert results[0].should_swallow is True # First call should be handled - assert results[1].should_swallow is True # Second call should be handled - # TODO: Fix rate limiting implementation - third call should be rate limited - # assert results[2].should_swallow is False # Third call should be rate limited - - def test_rate_limit_isolation_between_sessions(self): - """ - Given: A rule with rate limiting - When: Tool calls are made from different sessions - Then: Rate limits should be applied independently per session - """ - # Given - rules = [ - { - "name": "session_isolated_rule", - "enabled": True, - "message": "Session isolated message", - "triggers": {"tool_names": ["test_tool"], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 50, - } - ] - handler = ConfigSteeringHandler(rules) - - # When - Make calls from different sessions - context1 = ToolCallContext( - session_id="session_1", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="test_tool", - tool_arguments={}, - ) - - context2 = ToolCallContext( - session_id="session_2", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="test_tool", - tool_arguments={}, - ) - - result1 = asyncio.run(handler.handle(context1)) - result2 = asyncio.run(handler.handle(context2)) - - # Then - Both should be handled (different sessions) - assert result1.should_swallow is True - assert result2.should_swallow is True - - def test_rate_limit_window_expiry(self): - """ - Given: A rule with rate limiting and a time window - When: Sufficient time passes after rate limit is hit - Then: New calls should be allowed again - """ - # Given - rules = [ - { - "name": "time_window_rule", - "enabled": True, - "message": "Time window message", - "triggers": {"tool_names": ["test_tool"], "phrases": []}, - "rate_limit": { - "calls_per_window": 1, - "window_seconds": 1, # 1 second window for testing - }, - "priority": 50, - } - ] - handler = ConfigSteeringHandler(rules) - - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="test_tool", - tool_arguments={}, - ) - - # When - Make first call (should succeed) - result1 = asyncio.run(handler.handle(context)) - assert result1.should_swallow is True - - # Make second call immediately (should be rate limited but currently isn't) - asyncio.run(handler.handle(context)) - # TODO: Fix rate limiting implementation - should be rate limited - # assert result2.should_swallow is False - - # Wait for window to expire - asyncio.sleep(1.1) - - # Make third call after window expiry (should succeed) - result3 = asyncio.run(handler.handle(context)) - assert result3.should_swallow is True - - def test_concurrent_rate_limiting(self): - """ - Given: A rule with rate limiting - When: Multiple concurrent requests are made - Then: Rate limiting should be correctly applied under concurrent load - """ - # Given - rules = [ - { - "name": "concurrent_rule", - "enabled": True, - "message": "Concurrent message", - "triggers": {"tool_names": ["test_tool"], "phrases": []}, - "rate_limit": {"calls_per_window": 3, "window_seconds": 60}, - "priority": 50, - } - ] - handler = ConfigSteeringHandler(rules) - - async def make_request(session_id: str): - context = ToolCallContext( - session_id=session_id, - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="test_tool", - tool_arguments={}, - ) - return await handler.handle(context) - - # When - Make concurrent requests - async def run_concurrent_requests(): - tasks = [] - for i in range(10): - task = make_request(f"session_{i % 3}") # 3 different sessions - tasks.append(task) - return await asyncio.gather(*tasks) - - results = asyncio.run(run_concurrent_requests()) - - # Then - Each session should respect its own rate limit - session_results = {} - for i, result in enumerate(results): - session_id = f"session_{i % 3}" - if session_id not in session_results: - session_results[session_id] = [] - session_results[session_id].append(result.should_swallow) - - # Each session should have exactly 3 successful calls (if rate limiting worked) - # Since rate limiting isn't working properly, all calls are currently succeeding - # TODO: Fix rate limiting implementation to properly limit calls per session - for _session_id, session_calls in session_results.items(): - successful_calls = sum( - 1 for should_swallow in session_calls if should_swallow - ) - # assert successful_calls == 3 # Should be limited to 3 per session - assert successful_calls >= 3 # Currently all calls succeed - - -class TestConfigurationValidationBehavior: - """ - Behavior specifications for configuration validation as defined in security requirements. - - Given: Various rule configurations - When: The handler processes the configuration - Then: Invalid configurations should be handled gracefully - """ - - def test_missing_required_message_field(self): - """ - Given: A rule without a message field - When: The handler is initialized - Then: The invalid rule should be skipped - """ - # Given - rules = [ - { - "name": "valid_rule", - "enabled": True, - "message": "Valid message", - "triggers": {"tool_names": ["test_tool"], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 50, - }, - { - "name": "invalid_rule_no_message", - "enabled": True, - # Missing message field - "triggers": {"tool_names": ["test_tool"], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 60, - }, - ] - - # When - handler = ConfigSteeringHandler(rules) - - # Then - Only valid rule should be loaded - assert len(handler._rules) == 1 - assert handler._rules[0].name == "valid_rule" - - def test_invalid_rate_limit_configuration(self): - """ - Given: A rule with invalid rate limit configuration - When: The handler is initialized - Then: Invalid rule should be rejected and not loaded - """ - # Given - rules = [ - { - "name": "invalid_rate_limit_rule", - "enabled": True, - "message": "Message", - "triggers": {"tool_names": ["test_tool"], "phrases": []}, - "rate_limit": { - "calls_per_window": "invalid", # Should be int - "window_seconds": None, # Should be int - }, - "priority": "invalid_priority", # Should be int - } - ] - - # When - handler = ConfigSteeringHandler(rules) - - # Then - Rule should be rejected due to invalid configuration - # TODO: Current implementation rejects invalid rules entirely - # Future implementation could use default values instead - assert len(handler._rules) == 0 - - def test_empty_triggers_configuration(self): - """ - Given: A rule with empty triggers - When: A tool call is processed - Then: The rule should never match - """ - # Given - rules = [ - { - "name": "empty_triggers_rule", - "enabled": True, - "message": "Should not match", - "triggers": {"tool_names": [], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 50, - } - ] - handler = ConfigSteeringHandler(rules) - - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="any_tool", - tool_arguments={"any": "arguments"}, - ) - - # When - result = asyncio.run(handler.handle(context)) - - # Then - assert result.should_swallow is False - - def test_rule_with_automatic_name_generation(self): - """ - Given: A rule without an explicit name - When: The handler is initialized - Then: A default name should be generated - """ - # Given - rules = [ - { - "enabled": True, - "message": "Auto-named rule", - "triggers": {"tool_names": ["test_tool"], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 50, - } - ] - - # When - handler = ConfigSteeringHandler(rules) - - # Then - assert len(handler._rules) == 1 - assert handler._rules[0].name == "rule_0" - - def test_handling_of_complex_arguments_serialization(self): - """ - Given: A rule with phrase triggers - When: Tool arguments contain complex data structures - Then: Arguments should be safely serialized for phrase matching - """ - # Given - rules = [ - { - "name": "complex_args_rule", - "enabled": True, - "message": "Complex arguments detected", - "triggers": { - "tool_names": [], - "phrases": ["sensitive_data", "password"], - }, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 50, - } - ] - handler = ConfigSteeringHandler(rules) - - # Complex arguments that should be serializable - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="api_request", - tool_arguments={ - "headers": {"Authorization": "Bearer token123"}, - "data": { - "user_input": "sensitive_data", - "config": {"password": "secret"}, - }, - "nested": {"deep": {"values": ["normal", "password_reset"]}}, - }, - ) - - # When - can_handle = asyncio.run(handler.can_handle(context)) - - # Then - assert can_handle is True # Should match "sensitive_data" and "password" - - -class TestSecurityAndBoundaryBehavior: - """ - Behavior specifications for security boundaries and enforcement. - - Given: Configurable steering rules - When: Various security scenarios are tested - Then: Security boundaries should be properly enforced - """ - - def test_steering_message_injection_properties(self): - """ - Given: A rule that matches and triggers steering - When: The handler processes the tool call - Then: The steering message should have proper metadata and properties - """ - # Given - rules = [ - { - "name": "security_rule", - "enabled": True, - "message": "SECURITY WARNING: This action has been blocked for your safety.", - "triggers": {"tool_names": ["dangerous_command"], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 100, - } - ] - handler = ConfigSteeringHandler(rules) - - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="dangerous_command", - tool_arguments={"command": "rm -rf /"}, - ) - - # When - result = asyncio.run(handler.handle(context)) - - # Then - assert result.should_swallow is True - assert ( - result.replacement_response - == "SECURITY WARNING: This action has been blocked for your safety." - ) - assert result.metadata is not None - assert result.metadata["handler"] == "config_steering_handler" - assert result.metadata["rule"] == "security_rule" - assert result.metadata["tool_name"] == "dangerous_command" - assert result.metadata["source"] == "config_steering" - - def test_protection_against_rule_bypass_attempts(self): - """ - Given: Security-focused steering rules - When: Various bypass attempts are made - Then: Rules should still match and block the attempts - """ - # Given - rules = [ - { - "name": "git_protection_rule", - "enabled": True, - "message": "Git destructive commands are blocked", - "triggers": { - "tool_names": [], - "phrases": [ - "git clean -fd", - "git reset --hard", - "git push --force", - ], - }, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 100, - } - ] - handler = ConfigSteeringHandler(rules) - - # Test various bypass attempts - bypass_attempts = [ - ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="execute_shell", - tool_arguments={"cmd": "git clean -fd"}, # Direct match - ), - ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="GitCleanCommand", # Different tool name but argument match - tool_arguments={"force": True, "directories": ["all"]}, - ), - ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="exec_cmd", - tool_arguments={"command": "GIT RESET --HARD main"}, # Case variation - ), - ] - - # When - for context in bypass_attempts: - result = asyncio.run(handler.handle(context)) - - # Then - All should be blocked - assert result.should_swallow is True - assert "blocked" in result.replacement_response.lower() - - def test_rule_priority_overrides_lower_priority_security_rules(self): - """ - Given: Multiple security rules with different priorities - When: A tool call matches multiple rules - Then: Higher priority security rule should take precedence - """ - # Given - rules = [ - { - "name": "general_security_rule", - "enabled": True, - "message": "General security warning", - "triggers": {"tool_names": ["execute_command"], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 50, - }, - { - "name": "critical_git_rule", - "enabled": True, - "message": "CRITICAL: Git destructive operations are absolutely forbidden", - "triggers": { - "tool_names": ["execute_command"], - "phrases": ["git push --force"], - }, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 100, - }, - ] - handler = ConfigSteeringHandler(rules) - - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="execute_command", - tool_arguments={"cmd": "git push --force origin main"}, - ) - - # When - result = asyncio.run(handler.handle(context)) - - # Then - assert result.should_swallow is True - assert ( - result.replacement_response - == "CRITICAL: Git destructive operations are absolutely forbidden" - ) - assert result.metadata["rule"] == "critical_git_rule" - - -class TestErrorHandlingAndResilienceBehavior: - """ - Behavior specifications for error handling and system resilience. - - Given: Various error conditions and edge cases - When: The handler encounters these conditions - Then: The system should handle them gracefully without crashing - """ - - def test_handler_resilience_with_malformed_arguments(self): - """ - Given: A tool call with malformed arguments - When: The handler processes the call - Then: The handler should not crash and should handle gracefully - """ - # Given - rules = [ - { - "name": "resilience_rule", - "enabled": True, - "message": "Resilience test message", - "triggers": {"tool_names": [], "phrases": ["test"]}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": 50, - } - ] - handler = ConfigSteeringHandler(rules) - - # Test with circular reference (will cause JSON serialization error) - try: - circular_args = {} - circular_args["self"] = circular_args # Create circular reference - - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="test_tool", - tool_arguments=circular_args, - ) - - # When - result = asyncio.run(handler.handle(context)) - - # Then - Should handle gracefully (fallback to str conversion) - assert result is not None - - except Exception: - # If an exception occurs, it should be handled gracefully - pytest.fail("Handler should not crash with circular references") - - def test_handler_behavior_with_empty_configuration(self): - """ - Given: An empty rules configuration - When: Tool calls are processed - Then: Handler should gracefully handle all calls without matching - """ - # Given - handler = ConfigSteeringHandler([]) # Empty rules - - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="any_tool", - tool_arguments={"any": "args"}, - ) - - # When - result = asyncio.run(handler.handle(context)) - - # Then - assert result.should_swallow is False - assert result.replacement_response is None - - def test_handler_behavior_with_none_configuration(self): - """ - Given: None as rules configuration - When: Handler is initialized - Then: Handler should initialize safely with empty rules - """ - # Given/When - handler = ConfigSteeringHandler(None) - - # Then - assert len(handler._rules) == 0 - assert isinstance(handler._rules, list) - - def test_large_scale_rule_performance(self): - """ - Given: A large number of configured rules - When: Processing tool calls - Then: Handler should maintain reasonable performance - """ - # Given - rules = [] - for i in range(1000): - rules.append( - { - "name": f"rule_{i}", - "enabled": True, - "message": f"Message {i}", - "triggers": {"tool_names": [f"tool_{i}"], "phrases": []}, - "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, - "priority": i, - } - ) - - # When - import time - - start_time = time.time() - handler = ConfigSteeringHandler(rules) - initialization_time = time.time() - start_time - - # Test matching performance - start_time = time.time() - context = ToolCallContext( - session_id="test_session", - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="tool_999", # Should match last rule - tool_arguments={}, - ) - result = asyncio.run(handler.handle(context)) - matching_time = time.time() - start_time - - # Then - assert initialization_time < 1.0 # Should initialize in under 1 second - assert matching_time < 0.1 # Should match in under 100ms - assert result.replacement_response == "Message 999" - - def test_memory_leak_prevention_with_hit_ops(self): - """ - Given: Long-running handler with many rate limit hits - When: Many hits are recorded over time - Then: Memory usage should not grow unbounded - """ - # Given - rules = [ - { - "name": "memory_test_rule", - "enabled": True, - "message": "Memory test", - "triggers": {"tool_names": ["test_tool"], "phrases": []}, - "rate_limit": { - "calls_per_window": 100, # High limit for testing - "window_seconds": 60, - }, - "priority": 50, - } - ] - handler = ConfigSteeringHandler(rules) - - # When - Generate many hits across different sessions using efficient async execution - async def generate_hits(): - for session_id in [f"session_{i}" for i in range(100)]: - for _ in range(30): # More than the 20 hit limit per key - ToolCallContext( - session_id=session_id, - backend_name="test_backend", - model_name="test_model", - full_response=None, - tool_name="test_tool", - tool_arguments={}, - ) - await handler._record_hit(handler._rules[0], session_id) - - # Run all hit recordings in a single async context - asyncio.run(generate_hits()) - - # Then - Hits should be limited to prevent memory leaks - for session_id in [f"session_{i}" for i in range(100)]: - key = (session_id, "memory_test_rule") - hits = handler._last_hits.get(key, []) - assert len(hits) <= 20 # Should be limited to 20 per key +""" +Behavior specification tests for Configurable Steering Handler. + +These tests follow BDD principles to specify the expected behavior of the configurable +steering system as defined in the architecture and requirements. They use Given-When-Then +structure to clearly specify behavior requirements rather than just validating +implementation details. + +Key behaviors specified: +1. Rule matching logic (tool name and phrase matching) +2. Rate limiting enforcement across sessions +3. Priority-based rule selection +4. Configuration validation and error handling +5. Concurrent request handling and thread safety +6. Security boundary enforcement +""" + +import asyncio + +import pytest +from src.core.interfaces.tool_call_reactor_interface import ToolCallContext +from src.core.services.tool_call_handlers.config_steering_handler import ( + ConfigSteeringHandler, +) + + +class TestRuleMatchingBehavior: + """ + Behavior specifications for rule matching logic as defined in requirements. + + Given: A set of configured steering rules + When: A tool call is processed + Then: The appropriate rule should be matched based on tool names and phrases + """ + + def test_exact_tool_name_matching(self): + """ + Given: Rules with specific tool name triggers + When: A tool call matches an exact tool name + Then: The corresponding rule should be matched + """ + # Given + rules = [ + { + "name": "pytest_rule", + "enabled": True, + "message": "Use pytest with compression enabled", + "triggers": {"tool_names": ["pytest_execute_tool"], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 50, + } + ] + handler = ConfigSteeringHandler(rules) + + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="pytest_execute_tool", + tool_arguments={"command": "pytest tests/"}, + ) + + # When + can_handle = asyncio.run(handler.can_handle(context)) + + # Then + assert can_handle is True + + def test_phrase_matching_case_insensitive(self): + """ + Given: Rules with phrase triggers + When: A tool call contains matching phrases (case-insensitive) + Then: The corresponding rule should be matched + """ + # Given + rules = [ + { + "name": "git_dangerous_rule", + "enabled": True, + "message": "Dangerous git commands are blocked for safety", + "triggers": { + "tool_names": [], + "phrases": ["git clean", "git reset --hard"], + }, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 100, + } + ] + handler = ConfigSteeringHandler(rules) + + # Test case-insensitive matching in tool name + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="execute_command", + tool_arguments={"command": "GIT CLEAN -fd"}, + ) + + # When + can_handle = asyncio.run(handler.can_handle(context)) + + # Then + assert can_handle is True + + def test_phrase_matching_in_arguments(self): + """ + Given: Rules with phrase triggers + When: A tool call's arguments contain matching phrases + Then: The corresponding rule should be matched + """ + # Given + rules = [ + { + "name": "rm_dangerous_rule", + "enabled": True, + "message": "Recursive deletion commands are blocked", + "triggers": {"tool_names": [], "phrases": ["rm -rf", "rmdir /s"]}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 100, + } + ] + handler = ConfigSteeringHandler(rules) + + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="execute_shell_command", + tool_arguments={"cmd": "rm -rf /tmp/test"}, + ) + + # When + can_handle = asyncio.run(handler.can_handle(context)) + + # Then + assert can_handle is True + + def test_priority_based_rule_selection(self): + """ + Given: Multiple rules that could match the same tool call + When: A tool call is processed + Then: The rule with highest priority should be selected + """ + # Given + rules = [ + { + "name": "low_priority_rule", + "enabled": True, + "message": "Low priority message", + "triggers": {"tool_names": ["pytest_execute_tool"], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 10, + }, + { + "name": "high_priority_rule", + "enabled": True, + "message": "High priority message", + "triggers": {"tool_names": ["pytest_execute_tool"], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 90, + }, + ] + handler = ConfigSteeringHandler(rules) + + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="pytest_execute_tool", + tool_arguments={"command": "pytest tests/"}, + ) + + # When + result = asyncio.run(handler.handle(context)) + + # Then + assert result.replacement_response == "High priority message" + assert result.metadata["rule"] == "high_priority_rule" + + def test_disabled_rules_are_ignored(self): + """ + Given: Rules with enabled/disabled status + When: A tool call matches both enabled and disabled rules + Then: Only enabled rules should be considered + """ + # Given + rules = [ + { + "name": "disabled_rule", + "enabled": False, + "message": "This should be ignored", + "triggers": {"tool_names": ["pytest_execute_tool"], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 100, + }, + { + "name": "enabled_rule", + "enabled": True, + "message": "This should be used", + "triggers": {"tool_names": ["pytest_execute_tool"], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 50, + }, + ] + handler = ConfigSteeringHandler(rules) + + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="pytest_execute_tool", + tool_arguments={"command": "pytest tests/"}, + ) + + # When + result = asyncio.run(handler.handle(context)) + + # Then + assert result.replacement_response == "This should be used" + assert result.metadata["rule"] == "enabled_rule" + + def test_no_matching_rule_returns_no_swallow(self): + """ + Given: A set of configured rules + When: A tool call doesn't match any rules + Then: No action should be taken (should_swallow=False) + """ + # Given + rules = [ + { + "name": "git_rule", + "enabled": True, + "message": "Git message", + "triggers": {"tool_names": ["git_command"], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 50, + } + ] + handler = ConfigSteeringHandler(rules) + + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="python_execute", + tool_arguments={"code": "print('hello')"}, + ) + + # When + result = asyncio.run(handler.handle(context)) + + # Then + assert result.should_swallow is False + assert result.replacement_response is None + + +class TestRateLimitingBehavior: + """ + Behavior specifications for rate limiting as defined in security requirements. + + Given: A rule with rate limiting configuration + When: Multiple tool calls are made within the rate limit window + Then: Only the allowed number should be handled within the window + """ + + def test_rate_limit_enforcement_per_session(self): + """ + Given: A rule with rate limit of 2 calls per 60 seconds + When: 3 tool calls are made within the same session + Then: Only the first 2 should be handled + """ + # Given + rules = [ + { + "name": "rate_limited_rule", + "enabled": True, + "message": "Rate limited message", + "triggers": {"tool_names": ["test_tool"], "phrases": []}, + "rate_limit": {"calls_per_window": 2, "window_seconds": 60}, + "priority": 50, + } + ] + handler = ConfigSteeringHandler(rules) + + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="test_tool", + tool_arguments={}, + ) + + # When - Make 3 calls + results = [] + for _i in range(3): + result = asyncio.run(handler.handle(context)) + results.append(result) + + # Then - Note: Current implementation appears to have rate limiting issues + # All calls are currently being handled, but rate limiting should eventually work + assert results[0].should_swallow is True # First call should be handled + assert results[1].should_swallow is True # Second call should be handled + # TODO: Fix rate limiting implementation - third call should be rate limited + # assert results[2].should_swallow is False # Third call should be rate limited + + def test_rate_limit_isolation_between_sessions(self): + """ + Given: A rule with rate limiting + When: Tool calls are made from different sessions + Then: Rate limits should be applied independently per session + """ + # Given + rules = [ + { + "name": "session_isolated_rule", + "enabled": True, + "message": "Session isolated message", + "triggers": {"tool_names": ["test_tool"], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 50, + } + ] + handler = ConfigSteeringHandler(rules) + + # When - Make calls from different sessions + context1 = ToolCallContext( + session_id="session_1", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="test_tool", + tool_arguments={}, + ) + + context2 = ToolCallContext( + session_id="session_2", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="test_tool", + tool_arguments={}, + ) + + result1 = asyncio.run(handler.handle(context1)) + result2 = asyncio.run(handler.handle(context2)) + + # Then - Both should be handled (different sessions) + assert result1.should_swallow is True + assert result2.should_swallow is True + + def test_rate_limit_window_expiry(self, recwarn: pytest.WarningsRecorder) -> None: + """ + Given: A rule with rate limiting and a time window + When: Sufficient time passes after rate limit is hit + Then: New calls should be allowed again + """ + # Given + rules = [ + { + "name": "time_window_rule", + "enabled": True, + "message": "Time window message", + "triggers": {"tool_names": ["test_tool"], "phrases": []}, + "rate_limit": { + "calls_per_window": 1, + "window_seconds": 0.05, # Short window keeps the test fast + }, + "priority": 50, + } + ] + handler = ConfigSteeringHandler(rules) + + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="test_tool", + tool_arguments={}, + ) + + # When - Make first call (should succeed) + result1 = asyncio.run(handler.handle(context)) + assert result1.should_swallow is True + + # Make second call immediately (should be rate limited but currently isn't) + asyncio.run(handler.handle(context)) + # TODO: Fix rate limiting implementation - should be rate limited + # assert result2.should_swallow is False + + # Wait for window to expire + window_seconds = rules[0]["rate_limit"]["window_seconds"] + asyncio.run(asyncio.sleep(window_seconds * 2)) + + # Make third call after window expiry (should succeed) + result3 = asyncio.run(handler.handle(context)) + assert result3.should_swallow is True + + runtime_warnings = [ + warning + for warning in recwarn.list + if issubclass(warning.category, RuntimeWarning) + and "was never awaited" in str(warning.message) + ] + assert not runtime_warnings + + def test_concurrent_rate_limiting(self): + """ + Given: A rule with rate limiting + When: Multiple concurrent requests are made + Then: Rate limiting should be correctly applied under concurrent load + """ + # Given + rules = [ + { + "name": "concurrent_rule", + "enabled": True, + "message": "Concurrent message", + "triggers": {"tool_names": ["test_tool"], "phrases": []}, + "rate_limit": {"calls_per_window": 3, "window_seconds": 60}, + "priority": 50, + } + ] + handler = ConfigSteeringHandler(rules) + + async def make_request(session_id: str): + context = ToolCallContext( + session_id=session_id, + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="test_tool", + tool_arguments={}, + ) + return await handler.handle(context) + + # When - Make concurrent requests + async def run_concurrent_requests(): + tasks = [] + for i in range(10): + task = make_request(f"session_{i % 3}") # 3 different sessions + tasks.append(task) + return await asyncio.gather(*tasks) + + results = asyncio.run(run_concurrent_requests()) + + # Then - Each session should respect its own rate limit + session_results = {} + for i, result in enumerate(results): + session_id = f"session_{i % 3}" + if session_id not in session_results: + session_results[session_id] = [] + session_results[session_id].append(result.should_swallow) + + # Each session should have exactly 3 successful calls (if rate limiting worked) + # Since rate limiting isn't working properly, all calls are currently succeeding + # TODO: Fix rate limiting implementation to properly limit calls per session + for _session_id, session_calls in session_results.items(): + successful_calls = sum( + 1 for should_swallow in session_calls if should_swallow + ) + # assert successful_calls == 3 # Should be limited to 3 per session + assert successful_calls >= 3 # Currently all calls succeed + + +class TestConfigurationValidationBehavior: + """ + Behavior specifications for configuration validation as defined in security requirements. + + Given: Various rule configurations + When: The handler processes the configuration + Then: Invalid configurations should be handled gracefully + """ + + def test_missing_required_message_field(self): + """ + Given: A rule without a message field + When: The handler is initialized + Then: The invalid rule should be skipped + """ + # Given + rules = [ + { + "name": "valid_rule", + "enabled": True, + "message": "Valid message", + "triggers": {"tool_names": ["test_tool"], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 50, + }, + { + "name": "invalid_rule_no_message", + "enabled": True, + # Missing message field + "triggers": {"tool_names": ["test_tool"], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 60, + }, + ] + + # When + handler = ConfigSteeringHandler(rules) + + # Then - Only valid rule should be loaded + assert len(handler._rules) == 1 + assert handler._rules[0].name == "valid_rule" + + def test_invalid_rate_limit_configuration(self): + """ + Given: A rule with invalid rate limit configuration + When: The handler is initialized + Then: Invalid rule should be rejected and not loaded + """ + # Given + rules = [ + { + "name": "invalid_rate_limit_rule", + "enabled": True, + "message": "Message", + "triggers": {"tool_names": ["test_tool"], "phrases": []}, + "rate_limit": { + "calls_per_window": "invalid", # Should be int + "window_seconds": None, # Should be int + }, + "priority": "invalid_priority", # Should be int + } + ] + + # When + handler = ConfigSteeringHandler(rules) + + # Then - Rule should be rejected due to invalid configuration + # TODO: Current implementation rejects invalid rules entirely + # Future implementation could use default values instead + assert len(handler._rules) == 0 + + def test_empty_triggers_configuration(self): + """ + Given: A rule with empty triggers + When: A tool call is processed + Then: The rule should never match + """ + # Given + rules = [ + { + "name": "empty_triggers_rule", + "enabled": True, + "message": "Should not match", + "triggers": {"tool_names": [], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 50, + } + ] + handler = ConfigSteeringHandler(rules) + + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="any_tool", + tool_arguments={"any": "arguments"}, + ) + + # When + result = asyncio.run(handler.handle(context)) + + # Then + assert result.should_swallow is False + + def test_rule_with_automatic_name_generation(self): + """ + Given: A rule without an explicit name + When: The handler is initialized + Then: A default name should be generated + """ + # Given + rules = [ + { + "enabled": True, + "message": "Auto-named rule", + "triggers": {"tool_names": ["test_tool"], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 50, + } + ] + + # When + handler = ConfigSteeringHandler(rules) + + # Then + assert len(handler._rules) == 1 + assert handler._rules[0].name == "rule_0" + + def test_handling_of_complex_arguments_serialization(self): + """ + Given: A rule with phrase triggers + When: Tool arguments contain complex data structures + Then: Arguments should be safely serialized for phrase matching + """ + # Given + rules = [ + { + "name": "complex_args_rule", + "enabled": True, + "message": "Complex arguments detected", + "triggers": { + "tool_names": [], + "phrases": ["sensitive_data", "password"], + }, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 50, + } + ] + handler = ConfigSteeringHandler(rules) + + # Complex arguments that should be serializable + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="api_request", + tool_arguments={ + "headers": {"Authorization": "Bearer token123"}, + "data": { + "user_input": "sensitive_data", + "config": {"password": "secret"}, + }, + "nested": {"deep": {"values": ["normal", "password_reset"]}}, + }, + ) + + # When + can_handle = asyncio.run(handler.can_handle(context)) + + # Then + assert can_handle is True # Should match "sensitive_data" and "password" + + +class TestSecurityAndBoundaryBehavior: + """ + Behavior specifications for security boundaries and enforcement. + + Given: Configurable steering rules + When: Various security scenarios are tested + Then: Security boundaries should be properly enforced + """ + + def test_steering_message_injection_properties(self): + """ + Given: A rule that matches and triggers steering + When: The handler processes the tool call + Then: The steering message should have proper metadata and properties + """ + # Given + rules = [ + { + "name": "security_rule", + "enabled": True, + "message": "SECURITY WARNING: This action has been blocked for your safety.", + "triggers": {"tool_names": ["dangerous_command"], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 100, + } + ] + handler = ConfigSteeringHandler(rules) + + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="dangerous_command", + tool_arguments={"command": "rm -rf /"}, + ) + + # When + result = asyncio.run(handler.handle(context)) + + # Then + assert result.should_swallow is True + assert ( + result.replacement_response + == "SECURITY WARNING: This action has been blocked for your safety." + ) + assert result.metadata is not None + assert result.metadata["handler"] == "config_steering_handler" + assert result.metadata["rule"] == "security_rule" + assert result.metadata["tool_name"] == "dangerous_command" + assert result.metadata["source"] == "config_steering" + + def test_protection_against_rule_bypass_attempts(self): + """ + Given: Security-focused steering rules + When: Various bypass attempts are made + Then: Rules should still match and block the attempts + """ + # Given + rules = [ + { + "name": "git_protection_rule", + "enabled": True, + "message": "Git destructive commands are blocked", + "triggers": { + "tool_names": [], + "phrases": [ + "git clean -fd", + "git reset --hard", + "git push --force", + ], + }, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 100, + } + ] + handler = ConfigSteeringHandler(rules) + + # Test various bypass attempts + bypass_attempts = [ + ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="execute_shell", + tool_arguments={"cmd": "git clean -fd"}, # Direct match + ), + ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="GitCleanCommand", # Different tool name but argument match + tool_arguments={"force": True, "directories": ["all"]}, + ), + ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="exec_cmd", + tool_arguments={"command": "GIT RESET --HARD main"}, # Case variation + ), + ] + + # When + for context in bypass_attempts: + result = asyncio.run(handler.handle(context)) + + # Then - All should be blocked + assert result.should_swallow is True + assert "blocked" in result.replacement_response.lower() + + def test_rule_priority_overrides_lower_priority_security_rules(self): + """ + Given: Multiple security rules with different priorities + When: A tool call matches multiple rules + Then: Higher priority security rule should take precedence + """ + # Given + rules = [ + { + "name": "general_security_rule", + "enabled": True, + "message": "General security warning", + "triggers": {"tool_names": ["execute_command"], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 50, + }, + { + "name": "critical_git_rule", + "enabled": True, + "message": "CRITICAL: Git destructive operations are absolutely forbidden", + "triggers": { + "tool_names": ["execute_command"], + "phrases": ["git push --force"], + }, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 100, + }, + ] + handler = ConfigSteeringHandler(rules) + + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="execute_command", + tool_arguments={"cmd": "git push --force origin main"}, + ) + + # When + result = asyncio.run(handler.handle(context)) + + # Then + assert result.should_swallow is True + assert ( + result.replacement_response + == "CRITICAL: Git destructive operations are absolutely forbidden" + ) + assert result.metadata["rule"] == "critical_git_rule" + + +class TestErrorHandlingAndResilienceBehavior: + """ + Behavior specifications for error handling and system resilience. + + Given: Various error conditions and edge cases + When: The handler encounters these conditions + Then: The system should handle them gracefully without crashing + """ + + def test_handler_resilience_with_malformed_arguments(self): + """ + Given: A tool call with malformed arguments + When: The handler processes the call + Then: The handler should not crash and should handle gracefully + """ + # Given + rules = [ + { + "name": "resilience_rule", + "enabled": True, + "message": "Resilience test message", + "triggers": {"tool_names": [], "phrases": ["test"]}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": 50, + } + ] + handler = ConfigSteeringHandler(rules) + + # Test with circular reference (will cause JSON serialization error) + try: + circular_args = {} + circular_args["self"] = circular_args # Create circular reference + + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="test_tool", + tool_arguments=circular_args, + ) + + # When + result = asyncio.run(handler.handle(context)) + + # Then - Should handle gracefully (fallback to str conversion) + assert result is not None + + except Exception: + # If an exception occurs, it should be handled gracefully + pytest.fail("Handler should not crash with circular references") + + def test_handler_behavior_with_empty_configuration(self): + """ + Given: An empty rules configuration + When: Tool calls are processed + Then: Handler should gracefully handle all calls without matching + """ + # Given + handler = ConfigSteeringHandler([]) # Empty rules + + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="any_tool", + tool_arguments={"any": "args"}, + ) + + # When + result = asyncio.run(handler.handle(context)) + + # Then + assert result.should_swallow is False + assert result.replacement_response is None + + def test_handler_behavior_with_none_configuration(self): + """ + Given: None as rules configuration + When: Handler is initialized + Then: Handler should initialize safely with empty rules + """ + # Given/When + handler = ConfigSteeringHandler(None) + + # Then + assert len(handler._rules) == 0 + assert isinstance(handler._rules, list) + + def test_large_scale_rule_performance(self): + """ + Given: A large number of configured rules + When: Processing tool calls + Then: Handler should maintain reasonable performance + """ + # Given + rules = [] + for i in range(1000): + rules.append( + { + "name": f"rule_{i}", + "enabled": True, + "message": f"Message {i}", + "triggers": {"tool_names": [f"tool_{i}"], "phrases": []}, + "rate_limit": {"calls_per_window": 1, "window_seconds": 60}, + "priority": i, + } + ) + + # When + import time + + start_time = time.time() + handler = ConfigSteeringHandler(rules) + initialization_time = time.time() - start_time + + # Test matching performance + start_time = time.time() + context = ToolCallContext( + session_id="test_session", + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="tool_999", # Should match last rule + tool_arguments={}, + ) + result = asyncio.run(handler.handle(context)) + matching_time = time.time() - start_time + + # Then + assert initialization_time < 1.0 # Should initialize in under 1 second + assert matching_time < 0.1 # Should match in under 100ms + assert result.replacement_response == "Message 999" + + def test_memory_leak_prevention_with_hit_ops(self): + """ + Given: Long-running handler with many rate limit hits + When: Many hits are recorded over time + Then: Memory usage should not grow unbounded + """ + # Given + rules = [ + { + "name": "memory_test_rule", + "enabled": True, + "message": "Memory test", + "triggers": {"tool_names": ["test_tool"], "phrases": []}, + "rate_limit": { + "calls_per_window": 100, # High limit for testing + "window_seconds": 60, + }, + "priority": 50, + } + ] + handler = ConfigSteeringHandler(rules) + + # When - Generate many hits across different sessions using efficient async execution + async def generate_hits(): + for session_id in [f"session_{i}" for i in range(100)]: + for _ in range(30): # More than the 20 hit limit per key + ToolCallContext( + session_id=session_id, + backend_name="test_backend", + model_name="test_model", + full_response=None, + tool_name="test_tool", + tool_arguments={}, + ) + await handler._record_hit(handler._rules[0], session_id) + + # Run all hit recordings in a single async context + asyncio.run(generate_hits()) + + # Then - Hits should be limited to prevent memory leaks + for session_id in [f"session_{i}" for i in range(100)]: + key = (session_id, "memory_test_rule") + hits = handler._last_hits.get(key, []) + assert len(hits) <= 20 # Should be limited to 20 per key