Skip to content

Commit 7fb9294

Browse files
author
Mateusz
committed
Rename loop detector to token-window variant
1 parent 44b61c8 commit 7fb9294

File tree

8 files changed

+431
-411
lines changed

8 files changed

+431
-411
lines changed

src/core/app/lifecycle.py

Lines changed: 134 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -1,126 +1,134 @@
1-
from __future__ import annotations
2-
3-
import asyncio
4-
import logging
5-
from typing import Any
6-
7-
from fastapi import FastAPI
8-
9-
from src.core.interfaces.session_service_interface import ISessionService
10-
11-
logger = logging.getLogger(__name__)
12-
13-
14-
class AppLifecycle:
15-
"""Handles application lifecycle events.
16-
17-
This class manages startup and shutdown tasks for the application.
18-
"""
19-
20-
def __init__(self, app: FastAPI, config: dict[str, Any]) -> None:
21-
"""Initialize the lifecycle manager.
22-
23-
Args:
24-
app: The FastAPI application
25-
config: The application configuration
26-
"""
27-
self.app = app
28-
self.config = config
29-
self._background_tasks: list[asyncio.Task] = []
30-
31-
async def startup(self) -> None:
32-
"""Perform startup tasks.
33-
34-
This method is called during application startup.
35-
"""
36-
logger.info("Starting application lifecycle...")
37-
38-
# Start background tasks
39-
self._start_background_tasks()
40-
41-
async def shutdown(self) -> None:
42-
"""Perform shutdown tasks.
43-
44-
This method is called during application shutdown.
45-
"""
46-
logger.info("Shutting down application lifecycle...")
47-
48-
# Stop background tasks
49-
await self._stop_background_tasks()
50-
51-
# Close any remaining connections
52-
await self._close_connections()
53-
54-
def _start_background_tasks(self) -> None:
55-
"""Start background tasks."""
56-
# Start session cleanup task
57-
if self.config.get("session_cleanup_enabled", False):
58-
interval = self.config.get(
59-
"session_cleanup_interval", 3600
60-
) # 1 hour default
61-
max_age = self.config.get("session_max_age", 86400) # 1 day default
62-
63-
task = asyncio.create_task(
64-
self._session_cleanup_task(interval, max_age), name="session_cleanup"
65-
)
66-
self._background_tasks.append(task)
67-
logger.info(
68-
f"Started session cleanup task (interval: {interval}s, max age: {max_age}s)"
69-
)
70-
71-
async def _stop_background_tasks(self) -> None:
72-
"""Stop background tasks."""
73-
for task in self._background_tasks:
74-
if not task.done():
75-
task.cancel()
76-
try:
77-
await task
78-
except asyncio.CancelledError:
79-
logger.info(f"Cancelled background task: {task.get_name()}")
80-
81-
async def _close_connections(self) -> None:
82-
"""Close any remaining connections."""
83-
# Any connection cleanup code would go here
84-
85-
async def _session_cleanup_task(self, interval: int, max_age: int) -> None:
86-
"""Background task for cleaning up expired sessions.
87-
88-
Args:
89-
interval: The interval in seconds between cleanup runs
90-
max_age: The maximum age in seconds for sessions
91-
"""
92-
try:
93-
while True:
94-
await asyncio.sleep(interval)
95-
96-
try:
97-
# Get service provider
98-
provider = self.app.state.service_provider
99-
if not provider:
100-
logger.warning(
101-
"Service provider not available for session cleanup"
102-
)
103-
continue
104-
105-
# Get session service
106-
session_service = provider.get_service(ISessionService)
107-
if not session_service:
108-
logger.warning("Session service not available for cleanup")
109-
continue
110-
111-
# Perform cleanup
112-
deleted_count = 0
113-
if hasattr(session_service, "cleanup_expired_sessions"):
114-
deleted_count = await session_service.cleanup_expired_sessions(
115-
max_age
116-
)
117-
118-
if deleted_count > 0 and logger.isEnabledFor(logging.INFO):
119-
logger.info(f"Cleaned up {deleted_count} expired sessions")
120-
121-
except Exception as e:
122-
logger.error(f"Error during session cleanup: {e!s}")
123-
124-
except asyncio.CancelledError:
125-
logger.debug("Session cleanup task cancelled")
126-
raise
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import logging
5+
from contextlib import suppress
6+
from typing import Any
7+
8+
from fastapi import FastAPI
9+
10+
from src.core.interfaces.session_service_interface import ISessionService
11+
from src.core.interfaces.wire_capture_interface import IWireCapture
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
class AppLifecycle:
17+
"""Handles application lifecycle events.
18+
19+
This class manages startup and shutdown tasks for the application.
20+
"""
21+
22+
def __init__(self, app: FastAPI, config: dict[str, Any]) -> None:
23+
"""Initialize the lifecycle manager.
24+
25+
Args:
26+
app: The FastAPI application
27+
config: The application configuration
28+
"""
29+
self.app = app
30+
self.config = config
31+
self._background_tasks: list[asyncio.Task] = []
32+
33+
async def startup(self) -> None:
34+
"""Perform startup tasks.
35+
36+
This method is called during application startup.
37+
"""
38+
logger.info("Starting application lifecycle...")
39+
40+
# Start background tasks
41+
self._start_background_tasks()
42+
43+
async def shutdown(self) -> None:
44+
"""Perform shutdown tasks.
45+
46+
This method is called during application shutdown.
47+
"""
48+
logger.info("Shutting down application lifecycle...")
49+
50+
# Stop background tasks
51+
await self._stop_background_tasks()
52+
53+
# Close any remaining connections
54+
await self._close_connections()
55+
56+
def _start_background_tasks(self) -> None:
57+
"""Start background tasks."""
58+
# Start session cleanup task
59+
if self.config.get("session_cleanup_enabled", False):
60+
interval = self.config.get(
61+
"session_cleanup_interval", 3600
62+
) # 1 hour default
63+
max_age = self.config.get("session_max_age", 86400) # 1 day default
64+
65+
task = asyncio.create_task(
66+
self._session_cleanup_task(interval, max_age), name="session_cleanup"
67+
)
68+
self._background_tasks.append(task)
69+
logger.info(
70+
f"Started session cleanup task (interval: {interval}s, max age: {max_age}s)"
71+
)
72+
73+
async def _stop_background_tasks(self) -> None:
74+
"""Stop background tasks."""
75+
for task in self._background_tasks:
76+
if not task.done():
77+
task.cancel()
78+
try:
79+
await task
80+
except asyncio.CancelledError:
81+
logger.info(f"Cancelled background task: {task.get_name()}")
82+
83+
async def _close_connections(self) -> None:
84+
"""Close any remaining connections."""
85+
# Get service provider
86+
provider = getattr(self.app.state, "service_provider", None)
87+
if not provider:
88+
return
89+
90+
# Get wire capture service and shut it down
91+
wire_capture_service = provider.get_service(IWireCapture)
92+
if wire_capture_service and hasattr(wire_capture_service, "shutdown"):
93+
await wire_capture_service.shutdown()
94+
95+
async def _session_cleanup_task(self, interval: int, max_age: int) -> None:
96+
"""Background task for cleaning up expired sessions.
97+
98+
Args:
99+
interval: The interval in seconds between cleanup runs
100+
max_age: The maximum age in seconds for sessions
101+
"""
102+
try:
103+
while True:
104+
await asyncio.sleep(interval)
105+
106+
try:
107+
# Get service provider
108+
provider = self.app.state.service_provider
109+
if not provider:
110+
logger.warning(
111+
"Service provider not available for session cleanup"
112+
)
113+
continue
114+
115+
# Get session service
116+
session_service = provider.get_service(ISessionService)
117+
if not session_service:
118+
logger.warning("Session service not available for cleanup")
119+
continue
120+
121+
# Perform cleanup
122+
deleted_count = 0
123+
with suppress(AttributeError):
124+
deleted_count = await session_service.cleanup_expired(max_age)
125+
126+
if deleted_count > 0 and logger.isEnabledFor(logging.INFO):
127+
logger.info(f"Cleaned up {deleted_count} expired sessions")
128+
129+
except Exception as e:
130+
logger.error(f"Error during session cleanup: {e!s}")
131+
132+
except asyncio.CancelledError:
133+
logger.debug("Session cleanup task cancelled")
134+
raise

src/core/services/buffered_wire_capture_service.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -856,9 +856,19 @@ def force_shutdown_sync(self) -> None:
856856
if self._flush_task and not self._flush_task.done():
857857
with contextlib.suppress(Exception):
858858
task = self._flush_task
859+
# Suppress the 'task was destroyed but it is pending!' message
859860
if hasattr(task, "_log_destroy_pending"):
860861
cast(Any, task)._log_destroy_pending = False
861862
loop = task.get_loop()
862-
if not loop.is_closed():
863-
task.cancel()
863+
if loop.is_running() and not loop.is_closed():
864+
loop.call_soon_threadsafe(task.cancel)
865+
elif not loop.is_closed():
866+
# This is a fallback and might not always work if the loop is closing
867+
loop.run_until_complete(self.shutdown())
868+
864869
self._flush_task = None
870+
871+
def __del__(self) -> None:
872+
"""Ensure cleanup is attempted on garbage collection."""
873+
if self.enabled():
874+
self.force_shutdown_sync()

src/loop_detection/hybrid_detector.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
LoopDetectionResult,
2020
)
2121
from src.loop_detection.event import LoopDetectionEvent
22-
from src.loop_detection.gemini_cli_detector import GeminiCliLoopDetector
22+
from src.loop_detection.token_window_loop_detector import TokenWindowLoopDetector
2323

2424
logger = logging.getLogger(__name__)
2525

@@ -208,7 +208,7 @@ def __init__(
208208
"""
209209
# Initialize short pattern detector (gemini-cli)
210210
short_config = short_detector_config or {}
211-
self.short_detector = GeminiCliLoopDetector(
211+
self.short_detector = TokenWindowLoopDetector(
212212
content_loop_threshold=short_config.get("content_loop_threshold", 10),
213213
content_chunk_size=short_config.get("content_chunk_size", 50),
214214
max_history_length=short_config.get("max_history_length", 1000),

src/loop_detection/gemini_cli_detector.py renamed to src/loop_detection/token_window_loop_detector.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""
2-
Loop detection implementation ported from Google's gemini-cli.
2+
Sliding-window loop detector originally ported from the Gemini CLI.
3+
4+
This detector is backend-agnostic and can be used for any streaming LLM output.
35
46
This is a direct port of the loop detection algorithm from:
57
https://github.com/google/generative-ai-docs/tree/main/gemini-cli
@@ -45,9 +47,9 @@
4547
)
4648

4749

48-
class GeminiCliLoopDetector(ILoopDetector):
50+
class TokenWindowLoopDetector(ILoopDetector):
4951
"""
50-
Loop detector ported from Google's gemini-cli.
52+
Backend-agnostic loop detector using a sliding token window with markdown-aware resets.
5153
5254
This implementation uses a sliding window approach with intelligent context tracking
5355
to detect repetitive patterns in LLM responses without manual parameter tuning.
@@ -83,7 +85,7 @@ def __init__(
8385
self._loop_events: list[LoopDetectionEvent] = []
8486

8587
logger.debug(
86-
"GeminiCliLoopDetector initialized: chunk_size=%d, threshold=%d, max_history=%d",
88+
"TokenWindowLoopDetector initialized: chunk_size=%d, threshold=%d, max_history=%d",
8789
self.content_chunk_size,
8890
self.content_loop_threshold,
8991
self.max_history_length,

0 commit comments

Comments
 (0)