Skip to content

Conversation

@Micro66
Copy link
Collaborator

@Micro66 Micro66 commented Nov 30, 2025

Summary

  • Add POST /api/tasks/stream endpoint for creating and streaming task execution with SSE
  • Add GET /api/tasks/stream/{task_id} endpoint for streaming existing tasks
  • Add SSE event types compatible with Dify Workflow format: workflow_started, node_started, node_finished, workflow_finished, error, ping
  • Add team_id and task_type filters to GET /api/tasks endpoint
  • Add workflow_enabled filter to GET /api/teams endpoint
  • Add workflow_enabled and has_parameters fields to team list response

Test plan

  • Test SSE streaming endpoint with a valid team and prompt
  • Verify SSE event format matches Dify Workflow specification
  • Test task list filtering by team_id and task_type
  • Test team list filtering by workflow_enabled
  • Verify backward compatibility with existing endpoints

Summary by CodeRabbit

Release Notes

  • New Features

    • Stream task execution progress in real-time with live event updates
    • Create and stream new tasks directly
    • Stream progress of existing tasks
    • Filter tasks by team and task type
    • Filter teams by workflow capability status
  • Tests

    • Added comprehensive test coverage for streaming event schemas

✏️ Tip: You can customize this high-level summary in your review settings.

- Add POST /api/tasks/stream endpoint for creating and streaming task execution
- Add GET /api/tasks/stream/{task_id} endpoint for streaming existing tasks
- Add SSE event types: workflow_started, node_started, node_finished, workflow_finished, error, ping
- Add task streaming service with real-time subtask status tracking
- Add team_id and task_type filters to GET /api/tasks endpoint
- Add workflow_enabled filter to GET /api/teams endpoint
- Add workflow_enabled and has_parameters fields to team list response
- Compatible with Dify Workflow SSE event format for frontend integration
@coderabbitai
Copy link

coderabbitai bot commented Nov 30, 2025

Walkthrough

Introduces Server-Sent Events (SSE) streaming capability for task execution. Adds streaming endpoints to accept and monitor task progress, implements an asynchronous task streaming service with database polling, and extends filtering across task and team listing endpoints with optional parameters (team_id, task_type, workflow_enabled).

Changes

Cohort / File(s) Summary
SSE Schema Definitions
backend/app/schemas/sse.py
Introduces Pydantic models for SSE events: SSEEventType enum, event data payloads (WorkflowStartedData, NodeStartedData, NodeFinishedData, WorkflowFinishedData), base SSEEvent model with to_sse_format() serializer, StreamTaskCreate request schema, and team-related schemas (TeamParameter, TeamParametersResponse, TeamWithWorkflow).
Task Streaming Service
backend/app/services/task_streaming.py
Adds TaskStreamingService with asynchronous stream_task_execution() generator that polls database, tracks subtask state changes, and emits SSE events (WORKFLOW_STARTED, NODE_STARTED, NODE_FINISHED, WORKFLOW_FINISHED, error, ping) with permission validation and timeout handling. Includes stop_stream() method and global task_streaming_service instance.
Task Endpoint Streaming
backend/app/api/endpoints/adapter/tasks.py
Adds POST /stream endpoint to create and stream tasks in real-time and GET /stream/{task_id} endpoint to stream existing task progress with ownership validation. Extends get_tasks() with optional team_id and task_type query filters.
Task Service Filtering
backend/app/services/adapters/task_kinds.py
Extends get_user_tasks_with_pagination() signature with optional team_id and task_type parameters; applies filtering via JSON_EXTRACT on metadata.labels.taskType and team-level filtering post-retrieval.
Team Endpoint & Service Filtering
backend/app/api/endpoints/adapter/teams.py, backend/app/services/adapters/team_kinds.py
Adds optional workflow_enabled query parameter to list_teams endpoint and threads it through service layer. Extends get_user_teams() and count_user_teams() with workflow_enabled filtering. Introduces private _check_team_has_external_api_bots() helper to detect workflow capability via bot shell inspection.
SSE Schema Tests
backend/tests/schemas/test_sse.py
Comprehensive unit tests for all SSE schemas, event types, and data models, covering typical instantiation, edge cases, format validation, and SSE payload serialization.

Sequence Diagram

sequenceDiagram
    actor Client
    participant Endpoint as POST /stream
    participant Service as TaskStreamingService
    participant Database as DB
    
    Client->>Endpoint: StreamTaskCreate request
    Endpoint->>Endpoint: Create task from request
    Endpoint->>Service: stream_task_execution(task_id, user_id)
    activate Service
    
    Service->>Database: Validate task ownership
    Database-->>Service: Task found ✓
    Service-->>Client: data: {"event": "workflow_started", ...}
    
    loop Poll & Detect Changes
        Service->>Database: Fetch task & subtasks
        Database-->>Service: Current state
        
        alt Subtask enters RUNNING
            Service-->>Client: data: {"event": "node_started", ...}
        end
        
        alt Subtask reaches COMPLETED/FAILED
            Service-->>Client: data: {"event": "node_finished", ...}
        end
        
        alt Periodic heartbeat
            Service-->>Client: data: {"event": "ping"}
        end
        
        Note over Service: Poll interval (~500ms)
    end
    
    alt Workflow completes
        Service-->>Client: data: {"event": "workflow_finished", ...}
    end
    
    alt Error or timeout
        Service-->>Client: data: {"event": "error", ...}
    end
    
    deactivate Service
    Client->>Client: Close stream
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~65 minutes

  • backend/app/services/task_streaming.py: Async generator with database polling state machine, permission validation, and SSE event emission logic requires careful verification of: polling interval tuning, subtask state transition detection, error handling for database/timeout scenarios, and proper cleanup.
  • backend/app/services/adapters/team_kinds.py: New workflow detection logic (_check_team_has_external_api_bots) and filtering logic spanning multiple team assembly paths (own and shared teams) needs validation for correctness across different team configurations.
  • Filtering integration across layers: team_id/task_type filtering in task_kinds.py uses JSON_EXTRACT on metadata and applies post-retrieval filtering; workflow_enabled threading across endpoints and services requires tracing data flow.
  • Schema-to-service contract: Verify SSEEvent payload construction matches event types and task_streaming service emits correct data models.

Poem

🐰 A streaming adventure awaits today,
Events flowing in the SSE way,
Polls and pings with workflows in flight,
Tasks now streaming—what a delight!
Filters aplenty, workflow's on fire,
Hop along, code review's most dire!

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(backend): add SSE streaming API for task execution' directly and clearly describes the main change—adding SSE streaming capability to the backend API for task execution.
Docstring Coverage ✅ Passed Docstring coverage is 94.74% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch weagent/task-streaming-sse

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (12)
backend/app/schemas/sse.py (3)

75-75: Consider using timezone-aware datetime.

datetime.now() returns a naive datetime without timezone info. For SSE events that may be consumed by clients in different timezones, consider using datetime.now(timezone.utc) or datetime.utcnow() for consistency.

+from datetime import datetime, timezone
 from enum import Enum
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional

-    created_at: datetime = Field(default_factory=datetime.now)
+    created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

77-92: Move json import to module level.

The json import inside to_sse_format() is executed on every call. Move it to the module-level imports for better performance and consistency.

+import json
 from datetime import datetime
 from enum import Enum
 from typing import Any, Dict, List, Optional

     def to_sse_format(self) -> str:
         """Convert event to SSE format string"""
-        import json
-
         payload = {

105-105: Remove unused inputs field or implement full support.

The inputs field is accepted by StreamTaskCreate but discarded during conversion to TaskCreate—it's never stored or processed. Either remove this unused parameter or implement full support by adding it to TaskCreate and the downstream task processing pipeline.

backend/app/services/adapters/team_kinds.py (2)

622-633: Hardcoded limit of 10000 may be insufficient and is inefficient.

Using limit=10000 as a "large limit to get all" is fragile:

  1. Users with >10000 teams would get incorrect counts
  2. This loads all team data into memory just to count

Consider extracting this to a constant and adding a comment, or implementing a dedicated count query:

+# Maximum teams to consider for workflow_enabled filtering
+MAX_TEAMS_FOR_WORKFLOW_FILTER = 10000

         if workflow_enabled is not None:
-            # Get all teams and filter
             all_teams = self.get_user_teams(
                 db=db,
                 user_id=user_id,
                 skip=0,
-                limit=10000,  # Large limit to get all
+                limit=MAX_TEAMS_FOR_WORKFLOW_FILTER,
                 workflow_enabled=workflow_enabled,
             )
             return len(all_teams)

677-687: Use is_(True) for SQLAlchemy boolean comparisons.

Per static analysis and SQLAlchemy best practices, use Kind.is_active.is_(True) instead of Kind.is_active == True to avoid potential issues with Python's identity comparison.

             bot = (
                 db.query(Kind)
                 .filter(
                     Kind.user_id == user_id,
                     Kind.kind == "Bot",
                     Kind.name == member.botRef.name,
                     Kind.namespace == member.botRef.namespace,
-                    Kind.is_active == True,
+                    Kind.is_active.is_(True),
                 )
                 .first()
             )
backend/app/services/task_streaming.py (6)

17-17: Unused import: text is not used in this module.

The text function from SQLAlchemy is imported but never used anywhere in the file.

-from sqlalchemy import text
-from sqlalchemy.orm import Session
+from sqlalchemy.orm import Session

103-116: Blocking database operations in async context.

Using synchronous db.query() and db.expire_all() calls within an async generator will block the event loop during database I/O. For high concurrency, consider wrapping these in asyncio.to_thread() or using an async database driver.

Additionally, per Ruff E712: prefer Kind.is_active over Kind.is_active == True for boolean checks in SQLAlchemy filters.

                 # Refresh database session to get latest data
                 db.expire_all()
 
                 # Get current task status
                 task = (
                     db.query(Kind)
                     .filter(
                         Kind.id == task_id,
                         Kind.user_id == user_id,
                         Kind.kind == "Task",
-                        Kind.is_active == True,
+                        Kind.is_active,
                     )
                     .first()
                 )

159-167: Apply consistent boolean comparison style.

Same as above, use Kind.is_active for the boolean filter.

                         bot = (
                             db.query(Kind)
                             .filter(
                                 Kind.id == subtask.bot_ids[0],
                                 Kind.kind == "Bot",
-                                Kind.is_active == True,
+                                Kind.is_active,
                             )
                             .first()
                         )

256-256: Remove unused variable last_task_status.

This variable is assigned but never read, as flagged by Ruff F841.

-                last_task_status = current_task_status
-
                 # Send periodic ping to keep connection alive

257-267: Ping interval is tightly coupled to POLL_INTERVAL.

The comment states "Every 30 seconds" but this relies on POLL_INTERVAL = 1.0. If POLL_INTERVAL changes, the ping frequency will silently change. Consider computing the interval dynamically or extracting a constant.

+# Ping interval in seconds
+PING_INTERVAL = 30
+
 # Polling interval in seconds
 POLL_INTERVAL = 1.0

Then update the check:

-                if iteration % 30 == 0:  # Every 30 seconds
+                if iteration % int(PING_INTERVAL / POLL_INTERVAL) == 0:

269-276: Use explicit conversion in f-strings.

Per Ruff RUF010, prefer {e!s} over str(e) for cleaner f-string formatting.

         except Exception as e:
-            logger.error(f"Error streaming task {task_id}: {str(e)}", exc_info=True)
+            logger.error(f"Error streaming task {task_id}: {e!s}", exc_info=True)
             error_event = SSEEvent(
                 event=SSEEventType.ERROR,
                 task_id=task_id,
-                message=f"Streaming error: {str(e)}",
+                message=f"Streaming error: {e!s}",
             )
backend/tests/schemas/test_sse.py (1)

8-21: Import order should follow PEP 8 conventions.

Standard library imports (datetime) should come before third-party imports (pytest). As per coding guidelines, use isort for import ordering.

-import pytest
 from datetime import datetime
 
+import pytest
+
 from app.schemas.sse import (
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bcec0fa and ed8739b.

📒 Files selected for processing (7)
  • backend/app/api/endpoints/adapter/tasks.py (4 hunks)
  • backend/app/api/endpoints/adapter/teams.py (2 hunks)
  • backend/app/schemas/sse.py (1 hunks)
  • backend/app/services/adapters/task_kinds.py (2 hunks)
  • backend/app/services/adapters/team_kinds.py (4 hunks)
  • backend/app/services/task_streaming.py (1 hunks)
  • backend/tests/schemas/test_sse.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.{py,js,ts,tsx}

📄 CodeRabbit inference engine (AGENTS.md)

All code comments MUST be written in English, including inline comments, block comments, docstrings, TODO/FIXME annotations, and type hints descriptions

Files:

  • backend/app/services/task_streaming.py
  • backend/app/schemas/sse.py
  • backend/tests/schemas/test_sse.py
  • backend/app/services/adapters/team_kinds.py
  • backend/app/api/endpoints/adapter/tasks.py
  • backend/app/services/adapters/task_kinds.py
  • backend/app/api/endpoints/adapter/teams.py
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/*.py: Python code must be PEP 8 compliant, formatted with Black (line length: 88), and use isort for imports
Python code must use type hints
Python functions should have descriptive names, include docstrings for public functions/classes, extract magic numbers to constants, and keep functions to a maximum of 50 lines preferred

Files:

  • backend/app/services/task_streaming.py
  • backend/app/schemas/sse.py
  • backend/tests/schemas/test_sse.py
  • backend/app/services/adapters/team_kinds.py
  • backend/app/api/endpoints/adapter/tasks.py
  • backend/app/services/adapters/task_kinds.py
  • backend/app/api/endpoints/adapter/teams.py
🧬 Code graph analysis (6)
backend/app/services/task_streaming.py (4)
backend/app/models/kind.py (1)
  • Kind (25-42)
backend/app/models/subtask.py (1)
  • Subtask (31-64)
backend/app/schemas/sse.py (8)
  • NodeFinishedData (47-54)
  • NodeStartedData (37-44)
  • SSEEvent (68-92)
  • SSEEventType (18-26)
  • StreamTaskCreate (95-116)
  • WorkflowFinishedData (57-65)
  • WorkflowStartedData (29-34)
  • to_sse_format (77-92)
backend/app/schemas/task.py (1)
  • TaskCreate (45-64)
backend/tests/schemas/test_sse.py (1)
backend/app/schemas/sse.py (10)
  • SSEEvent (68-92)
  • SSEEventType (18-26)
  • WorkflowStartedData (29-34)
  • NodeStartedData (37-44)
  • NodeFinishedData (47-54)
  • WorkflowFinishedData (57-65)
  • StreamTaskCreate (95-116)
  • TeamParameter (119-128)
  • TeamParametersResponse (131-136)
  • to_sse_format (77-92)
backend/app/services/adapters/team_kinds.py (3)
backend/app/models/kind.py (1)
  • Kind (25-42)
backend/app/schemas/kind.py (2)
  • Team (217-224)
  • Bot (170-177)
backend/app/services/adapters/shell_utils.py (1)
  • get_shell_type (17-55)
backend/app/api/endpoints/adapter/tasks.py (6)
backend/app/api/dependencies.py (1)
  • get_db (12-21)
backend/app/services/kind_base.py (1)
  • get_db (30-32)
backend/app/schemas/sse.py (1)
  • StreamTaskCreate (95-116)
backend/app/schemas/task.py (1)
  • TaskCreate (45-64)
backend/app/services/adapters/task_kinds.py (1)
  • create_task_or_append (35-254)
backend/app/services/task_streaming.py (1)
  • stream_task_execution (48-278)
backend/app/services/adapters/task_kinds.py (1)
backend/app/models/kind.py (1)
  • Kind (25-42)
backend/app/api/endpoints/adapter/teams.py (1)
backend/app/services/adapters/team_kinds.py (2)
  • get_user_teams (160-313)
  • count_user_teams (607-657)
🪛 Ruff (0.14.6)
backend/app/services/task_streaming.py

113-113: Avoid equality comparisons to True; use Kind.is_active: for truth checks

Replace with Kind.is_active

(E712)


164-164: Avoid equality comparisons to True; use Kind.is_active: for truth checks

Replace with Kind.is_active

(E712)


256-256: Local variable last_task_status is assigned to but never used

Remove assignment to unused variable last_task_status

(F841)


270-270: Use explicit conversion flag

Replace with conversion flag

(RUF010)


274-274: Use explicit conversion flag

Replace with conversion flag

(RUF010)

backend/app/services/adapters/team_kinds.py

684-684: Avoid equality comparisons to True; use Kind.is_active: for truth checks

Replace with Kind.is_active

(E712)

backend/app/api/endpoints/adapter/tasks.py

77-77: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


78-78: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


139-139: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


140-140: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


190-190: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


191-191: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

backend/app/services/adapters/task_kinds.py

282-282: Avoid equality comparisons to True; use Kind.is_active: for truth checks

Replace with Kind.is_active

(E712)

backend/app/api/endpoints/adapter/teams.py

40-40: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


41-41: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: e2e-tests
🔇 Additional comments (17)
backend/app/schemas/sse.py (1)

119-145: LGTM!

The TeamParameter, TeamParametersResponse, and TeamWithWorkflow models are well-structured with appropriate type hints and default values.

backend/app/services/adapters/team_kinds.py (1)

160-179: LGTM!

The updated signature and docstring properly document the new workflow_enabled parameter.

backend/app/api/endpoints/adapter/teams.py (1)

33-62: LGTM!

The workflow_enabled query parameter is properly added and threaded through to both get_user_teams and count_user_teams service calls. The docstring update accurately describes the filtering behavior.

Note: The static analysis hints about Depends() in argument defaults are false positives - this is the standard FastAPI dependency injection pattern.

backend/app/api/endpoints/adapter/tasks.py (1)

184-206: LGTM!

The team_id and task_type query parameters are properly added and passed through to the service layer. The docstring is appropriately updated.

backend/app/services/adapters/task_kinds.py (2)

286-293: LGTM on task_type SQL filter.

Good implementation using JSON_EXTRACT with bindparams() for safe parameterization. This filter is correctly applied at the database level before pagination.


256-277: LGTM!

The updated function signature and docstring properly document the new team_id and task_type parameters.

backend/app/services/task_streaming.py (3)

42-46: LGTM!

The class initialization is straightforward. The active_streams dictionary provides a simple mechanism to track and control active streaming sessions.


280-282: LGTM!

The stop_stream method correctly signals the streaming loop to terminate by setting the flag to False.


285-286: LGTM!

The global service instance pattern is appropriate here for sharing state across request handlers.

backend/tests/schemas/test_sse.py (8)

24-34: LGTM!

Comprehensive coverage of all SSE event type enum values.


37-51: LGTM!

Test properly validates the WorkflowStartedData schema construction and field values.


54-83: LGTM!

Good test coverage including both full field specification and optional field defaults.


86-114: LGTM!

Tests cover both success and failure scenarios with appropriate field assertions.


117-134: LGTM!

Test validates the WorkflowFinishedData schema with all fields populated. Consider adding tests for failed and cancelled statuses in future iterations.


137-178: LGTM!

Thorough tests covering SSE event creation, format serialization, and error message handling.


181-220: LGTM!

Comprehensive tests for both minimal and full field specifications with proper default value validation.


223-279: LGTM!

Good coverage of TeamParameter with different types (string, select) and TeamParametersResponse with both empty and populated states.

Comment on lines +74 to +133
@router.post("/stream")
async def create_and_stream_task(
stream_request: StreamTaskCreate,
current_user: User = Depends(security.get_current_user),
db: Session = Depends(get_db),
):
"""
Create and execute a task with SSE streaming response.
Returns a Server-Sent Events stream with real-time task execution progress.
Compatible with Dify Workflow SSE event format.
Event types:
- workflow_started: Task execution started
- node_started: Bot/Subtask started execution
- node_finished: Bot/Subtask completed
- workflow_finished: Task execution completed
- error: Execution error occurred
- ping: Keep-alive ping
"""
# Convert StreamTaskCreate to TaskCreate
task_create = TaskCreate(
team_id=stream_request.team_id,
team_name=stream_request.team_name,
team_namespace=stream_request.team_namespace,
prompt=stream_request.prompt,
title=stream_request.title,
type=stream_request.type,
task_type=stream_request.task_type,
model_id=stream_request.model_id,
force_override_bot_model=stream_request.force_override_bot_model,
git_url=stream_request.git_url,
git_repo=stream_request.git_repo,
git_repo_id=stream_request.git_repo_id,
git_domain=stream_request.git_domain,
branch_name=stream_request.branch_name,
)

# Create the task
task_result = task_kinds_service.create_task_or_append(
db=db, obj_in=task_create, user=current_user, task_id=None
)
task_id = task_result["id"]

logger.info(f"Created streaming task {task_id} for user {current_user.id}")

# Return SSE streaming response
return StreamingResponse(
task_streaming_service.stream_task_execution(
db=db,
task_id=task_id,
user_id=current_user.id,
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential database session lifecycle issue in streaming endpoint.

The db session from Depends(get_db) is passed to task_streaming_service.stream_task_execution(), which is an async generator. FastAPI's dependency system will close the session when the request completes, but StreamingResponse continues yielding after the endpoint returns.

This could cause DetachedInstanceError or session-closed errors during streaming.

Verify that task_streaming_service.stream_task_execution creates its own database sessions internally for each poll iteration, rather than relying on the passed session:

#!/bin/bash
# Check how the streaming service handles database sessions
rg -n -A 20 "async def stream_task_execution" backend/app/services/task_streaming.py

# Check if it creates new sessions internally
rg -n "SessionLocal\|get_db" backend/app/services/task_streaming.py
🧰 Tools
🪛 Ruff (0.14.6)

77-77: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


78-78: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

🤖 Prompt for AI Agents
In backend/app/api/endpoints/adapter/tasks.py around lines 74-133, the endpoint
passes the FastAPI dependency DB session (db) into an async generator
(stream_task_execution) which will keep yielding after the request handler
returns, risking use-after-close errors; change this so the streaming service
does not rely on the dependency-scoped session: stop passing the dependency
session into stream_task_execution (pass task_id and user_id only or pass a
session factory), and update task_streaming_service.stream_task_execution to
open and close its own DB sessions (or call get_db / SessionLocal inside each
poll/iteration) so every DB access uses a fresh session that is closed properly.
Ensure the endpoint signature and call are updated accordingly and remove any
reliance on the passed-in dependency session inside the generator.

Comment on lines +136 to +168
@router.get("/stream/{task_id}")
async def stream_existing_task(
task_id: int,
current_user: User = Depends(security.get_current_user),
db: Session = Depends(get_db),
):
"""
Stream execution progress for an existing task.
Useful for reconnecting to a task stream after disconnection.
"""
# Verify task exists and belongs to user
task = task_kinds_service.get_task_by_id(
db=db, task_id=task_id, user_id=current_user.id
)
if not task:
raise HTTPException(status_code=404, detail="Task not found")

logger.info(f"Streaming existing task {task_id} for user {current_user.id}")

return StreamingResponse(
task_streaming_service.stream_task_execution(
db=db,
task_id=task_id,
user_id=current_user.id,
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Same database session lifecycle concern.

This endpoint has the same potential issue with the db session being passed to the async generator. The initial task verification (lines 148-152) will work since it runs before the response, but subsequent database access during streaming may fail.

See previous comment about verifying session handling in task_streaming_service.

🧰 Tools
🪛 Ruff (0.14.6)

139-139: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


140-140: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

Comment on lines +309 to +323

# Filter by team_id if provided
if team_id is not None:
task_team_id = task_related_data.get("team_id")
if task_team_id != team_id:
continue

result.append(
self._convert_to_task_dict_optimized(task, task_related_data, task_crd)
)

# Adjust total count if filtered by team_id (need to recount)
if team_id is not None:
total = len(result)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Incorrect pagination when team_id filter is applied.

The team_id filter is applied after offset(skip).limit(limit), resulting in:

  1. Potentially fewer items than limit returned even when more matching tasks exist
  2. Incorrect total count (line 322 returns len(result) which is the filtered subset, not the true total)

For example: if page 1 has 10 tasks but only 2 match team_id, the response would incorrectly show total: 2 instead of the actual count across all pages.

Consider applying the team_id filter at the SQL level:

         # Build base filters
         filters = [
             Kind.user_id == user_id,
             Kind.kind == "Task",
             Kind.is_active == True,
             text("JSON_EXTRACT(json, '$.status.status') != 'DELETE'"),
         ]

         # Add task_type filter if provided
         if task_type:
             filters.append(
                 text(
                     "JSON_EXTRACT(json, '$.metadata.labels.taskType') = :task_type"
                 ).bindparams(task_type=task_type)
             )

+        # Note: team_id filter requires joining with teams table or subquery
+        # since team reference is stored by name/namespace, not ID

Alternatively, if filtering by team_id at SQL level is complex, fetch without limit when team_id is provided (similar to the workflow_enabled approach).

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In backend/app/services/adapters/task_kinds.py around lines 309 to 323, the
team_id filter is applied after offset/limit which yields incorrect paging and
total; modify the data retrieval so team_id is applied at the DB level (add the
team_id filter to the query/where clause and use a proper count() query to
populate total) or, if adding the SQL filter is impractical, fetch all matching
rows when team_id is provided (no offset/limit), apply the team_id filter in
Python, compute total as the count of all filtered rows, then apply offset/limit
to build the paged result; ensure total always represents the full number of
matching tasks (use the DB count when possible).

Comment on lines +276 to +288
# Check if team has external API bots (workflow enabled)
team_has_workflow = self._check_team_has_external_api_bots(
db, temp_team, team_data.context_user_id
)
team_dict["workflow_enabled"] = team_has_workflow
team_dict["has_parameters"] = team_has_workflow # Same as workflow_enabled

# Apply workflow_enabled filter if specified
if workflow_enabled is not None:
if workflow_enabled and not team_has_workflow:
continue
elif not workflow_enabled and team_has_workflow:
continue
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Incorrect pagination when workflow_enabled filter is applied.

The workflow_enabled filter is applied after pagination (skip/limit), which causes incorrect results. For example, if skip=0, limit=10 and the first 10 teams have workflow_enabled=False, a filter for workflow_enabled=True would return 0 items even if matching teams exist beyond the first 10.

Consider filtering at the query level or fetching all teams when workflow_enabled is specified:

+        # If workflow_enabled filter is applied, we need to check all teams
+        # and apply pagination after filtering
+        if workflow_enabled is not None:
+            # Remove pagination from initial query
+            final_query = (
+                db.query(
+                    combined_query.c.team_id,
+                    # ... other columns
+                )
+                .order_by(combined_query.c.team_created_at.desc())
+                # No offset/limit here
+            )
+            # ... process all teams, filter, then apply pagination to result

Alternatively, if shell type can be determined via SQL/JSON, push the filter down to the database query.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In backend/app/services/adapters/team_kinds.py around lines 276 to 288, the
workflow_enabled filter is applied after pagination which yields incorrect
results; change the logic so teams are filtered for workflow_enabled before
skip/limit is applied: either add the workflow_enabled predicate to the DB query
(preferred) by pushing the check into SQL/JSON (detect shell/type or external
API bot flag in the query) or, if that’s not possible, fetch the unpaginated
team list, apply the workflow_enabled filter in memory, then apply skip/limit to
the filtered list so pagination reflects the filter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants