-
Notifications
You must be signed in to change notification settings - Fork 32
feat(backend): add SSE streaming API for task execution #228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Add POST /api/tasks/stream endpoint for creating and streaming task execution
- Add GET /api/tasks/stream/{task_id} endpoint for streaming existing tasks
- Add SSE event types: workflow_started, node_started, node_finished, workflow_finished, error, ping
- Add task streaming service with real-time subtask status tracking
- Add team_id and task_type filters to GET /api/tasks endpoint
- Add workflow_enabled filter to GET /api/teams endpoint
- Add workflow_enabled and has_parameters fields to team list response
- Compatible with Dify Workflow SSE event format for frontend integration
WalkthroughIntroduces Server-Sent Events (SSE) streaming capability for task execution. Adds streaming endpoints to accept and monitor task progress, implements an asynchronous task streaming service with database polling, and extends filtering across task and team listing endpoints with optional parameters (team_id, task_type, workflow_enabled). Changes
Sequence DiagramsequenceDiagram
actor Client
participant Endpoint as POST /stream
participant Service as TaskStreamingService
participant Database as DB
Client->>Endpoint: StreamTaskCreate request
Endpoint->>Endpoint: Create task from request
Endpoint->>Service: stream_task_execution(task_id, user_id)
activate Service
Service->>Database: Validate task ownership
Database-->>Service: Task found ✓
Service-->>Client: data: {"event": "workflow_started", ...}
loop Poll & Detect Changes
Service->>Database: Fetch task & subtasks
Database-->>Service: Current state
alt Subtask enters RUNNING
Service-->>Client: data: {"event": "node_started", ...}
end
alt Subtask reaches COMPLETED/FAILED
Service-->>Client: data: {"event": "node_finished", ...}
end
alt Periodic heartbeat
Service-->>Client: data: {"event": "ping"}
end
Note over Service: Poll interval (~500ms)
end
alt Workflow completes
Service-->>Client: data: {"event": "workflow_finished", ...}
end
alt Error or timeout
Service-->>Client: data: {"event": "error", ...}
end
deactivate Service
Client->>Client: Close stream
Estimated code review effort🎯 4 (Complex) | ⏱️ ~65 minutes
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (12)
backend/app/schemas/sse.py (3)
75-75: Consider using timezone-aware datetime.
datetime.now()returns a naive datetime without timezone info. For SSE events that may be consumed by clients in different timezones, consider usingdatetime.now(timezone.utc)ordatetime.utcnow()for consistency.+from datetime import datetime, timezone from enum import Enum -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional - created_at: datetime = Field(default_factory=datetime.now) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
77-92: Movejsonimport to module level.The
jsonimport insideto_sse_format()is executed on every call. Move it to the module-level imports for better performance and consistency.+import json from datetime import datetime from enum import Enum from typing import Any, Dict, List, Optional def to_sse_format(self) -> str: """Convert event to SSE format string""" - import json - payload = {
105-105: Remove unusedinputsfield or implement full support.The
inputsfield is accepted byStreamTaskCreatebut discarded during conversion toTaskCreate—it's never stored or processed. Either remove this unused parameter or implement full support by adding it toTaskCreateand the downstream task processing pipeline.backend/app/services/adapters/team_kinds.py (2)
622-633: Hardcoded limit of 10000 may be insufficient and is inefficient.Using
limit=10000as a "large limit to get all" is fragile:
- Users with >10000 teams would get incorrect counts
- This loads all team data into memory just to count
Consider extracting this to a constant and adding a comment, or implementing a dedicated count query:
+# Maximum teams to consider for workflow_enabled filtering +MAX_TEAMS_FOR_WORKFLOW_FILTER = 10000 if workflow_enabled is not None: - # Get all teams and filter all_teams = self.get_user_teams( db=db, user_id=user_id, skip=0, - limit=10000, # Large limit to get all + limit=MAX_TEAMS_FOR_WORKFLOW_FILTER, workflow_enabled=workflow_enabled, ) return len(all_teams)
677-687: Useis_(True)for SQLAlchemy boolean comparisons.Per static analysis and SQLAlchemy best practices, use
Kind.is_active.is_(True)instead ofKind.is_active == Trueto avoid potential issues with Python's identity comparison.bot = ( db.query(Kind) .filter( Kind.user_id == user_id, Kind.kind == "Bot", Kind.name == member.botRef.name, Kind.namespace == member.botRef.namespace, - Kind.is_active == True, + Kind.is_active.is_(True), ) .first() )backend/app/services/task_streaming.py (6)
17-17: Unused import:textis not used in this module.The
textfunction from SQLAlchemy is imported but never used anywhere in the file.-from sqlalchemy import text -from sqlalchemy.orm import Session +from sqlalchemy.orm import Session
103-116: Blocking database operations in async context.Using synchronous
db.query()anddb.expire_all()calls within an async generator will block the event loop during database I/O. For high concurrency, consider wrapping these inasyncio.to_thread()or using an async database driver.Additionally, per Ruff E712: prefer
Kind.is_activeoverKind.is_active == Truefor boolean checks in SQLAlchemy filters.# Refresh database session to get latest data db.expire_all() # Get current task status task = ( db.query(Kind) .filter( Kind.id == task_id, Kind.user_id == user_id, Kind.kind == "Task", - Kind.is_active == True, + Kind.is_active, ) .first() )
159-167: Apply consistent boolean comparison style.Same as above, use
Kind.is_activefor the boolean filter.bot = ( db.query(Kind) .filter( Kind.id == subtask.bot_ids[0], Kind.kind == "Bot", - Kind.is_active == True, + Kind.is_active, ) .first() )
256-256: Remove unused variablelast_task_status.This variable is assigned but never read, as flagged by Ruff F841.
- last_task_status = current_task_status - # Send periodic ping to keep connection alive
257-267: Ping interval is tightly coupled to POLL_INTERVAL.The comment states "Every 30 seconds" but this relies on
POLL_INTERVAL = 1.0. IfPOLL_INTERVALchanges, the ping frequency will silently change. Consider computing the interval dynamically or extracting a constant.+# Ping interval in seconds +PING_INTERVAL = 30 + # Polling interval in seconds POLL_INTERVAL = 1.0Then update the check:
- if iteration % 30 == 0: # Every 30 seconds + if iteration % int(PING_INTERVAL / POLL_INTERVAL) == 0:
269-276: Use explicit conversion in f-strings.Per Ruff RUF010, prefer
{e!s}overstr(e)for cleaner f-string formatting.except Exception as e: - logger.error(f"Error streaming task {task_id}: {str(e)}", exc_info=True) + logger.error(f"Error streaming task {task_id}: {e!s}", exc_info=True) error_event = SSEEvent( event=SSEEventType.ERROR, task_id=task_id, - message=f"Streaming error: {str(e)}", + message=f"Streaming error: {e!s}", )backend/tests/schemas/test_sse.py (1)
8-21: Import order should follow PEP 8 conventions.Standard library imports (
datetime) should come before third-party imports (pytest). As per coding guidelines, use isort for import ordering.-import pytest from datetime import datetime +import pytest + from app.schemas.sse import (
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
backend/app/api/endpoints/adapter/tasks.py(4 hunks)backend/app/api/endpoints/adapter/teams.py(2 hunks)backend/app/schemas/sse.py(1 hunks)backend/app/services/adapters/task_kinds.py(2 hunks)backend/app/services/adapters/team_kinds.py(4 hunks)backend/app/services/task_streaming.py(1 hunks)backend/tests/schemas/test_sse.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.{py,js,ts,tsx}
📄 CodeRabbit inference engine (AGENTS.md)
All code comments MUST be written in English, including inline comments, block comments, docstrings, TODO/FIXME annotations, and type hints descriptions
Files:
backend/app/services/task_streaming.pybackend/app/schemas/sse.pybackend/tests/schemas/test_sse.pybackend/app/services/adapters/team_kinds.pybackend/app/api/endpoints/adapter/tasks.pybackend/app/services/adapters/task_kinds.pybackend/app/api/endpoints/adapter/teams.py
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Python code must be PEP 8 compliant, formatted with Black (line length: 88), and use isort for imports
Python code must use type hints
Python functions should have descriptive names, include docstrings for public functions/classes, extract magic numbers to constants, and keep functions to a maximum of 50 lines preferred
Files:
backend/app/services/task_streaming.pybackend/app/schemas/sse.pybackend/tests/schemas/test_sse.pybackend/app/services/adapters/team_kinds.pybackend/app/api/endpoints/adapter/tasks.pybackend/app/services/adapters/task_kinds.pybackend/app/api/endpoints/adapter/teams.py
🧬 Code graph analysis (6)
backend/app/services/task_streaming.py (4)
backend/app/models/kind.py (1)
Kind(25-42)backend/app/models/subtask.py (1)
Subtask(31-64)backend/app/schemas/sse.py (8)
NodeFinishedData(47-54)NodeStartedData(37-44)SSEEvent(68-92)SSEEventType(18-26)StreamTaskCreate(95-116)WorkflowFinishedData(57-65)WorkflowStartedData(29-34)to_sse_format(77-92)backend/app/schemas/task.py (1)
TaskCreate(45-64)
backend/tests/schemas/test_sse.py (1)
backend/app/schemas/sse.py (10)
SSEEvent(68-92)SSEEventType(18-26)WorkflowStartedData(29-34)NodeStartedData(37-44)NodeFinishedData(47-54)WorkflowFinishedData(57-65)StreamTaskCreate(95-116)TeamParameter(119-128)TeamParametersResponse(131-136)to_sse_format(77-92)
backend/app/services/adapters/team_kinds.py (3)
backend/app/models/kind.py (1)
Kind(25-42)backend/app/schemas/kind.py (2)
Team(217-224)Bot(170-177)backend/app/services/adapters/shell_utils.py (1)
get_shell_type(17-55)
backend/app/api/endpoints/adapter/tasks.py (6)
backend/app/api/dependencies.py (1)
get_db(12-21)backend/app/services/kind_base.py (1)
get_db(30-32)backend/app/schemas/sse.py (1)
StreamTaskCreate(95-116)backend/app/schemas/task.py (1)
TaskCreate(45-64)backend/app/services/adapters/task_kinds.py (1)
create_task_or_append(35-254)backend/app/services/task_streaming.py (1)
stream_task_execution(48-278)
backend/app/services/adapters/task_kinds.py (1)
backend/app/models/kind.py (1)
Kind(25-42)
backend/app/api/endpoints/adapter/teams.py (1)
backend/app/services/adapters/team_kinds.py (2)
get_user_teams(160-313)count_user_teams(607-657)
🪛 Ruff (0.14.6)
backend/app/services/task_streaming.py
113-113: Avoid equality comparisons to True; use Kind.is_active: for truth checks
Replace with Kind.is_active
(E712)
164-164: Avoid equality comparisons to True; use Kind.is_active: for truth checks
Replace with Kind.is_active
(E712)
256-256: Local variable last_task_status is assigned to but never used
Remove assignment to unused variable last_task_status
(F841)
270-270: Use explicit conversion flag
Replace with conversion flag
(RUF010)
274-274: Use explicit conversion flag
Replace with conversion flag
(RUF010)
backend/app/services/adapters/team_kinds.py
684-684: Avoid equality comparisons to True; use Kind.is_active: for truth checks
Replace with Kind.is_active
(E712)
backend/app/api/endpoints/adapter/tasks.py
77-77: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
78-78: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
139-139: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
140-140: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
190-190: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
191-191: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
backend/app/services/adapters/task_kinds.py
282-282: Avoid equality comparisons to True; use Kind.is_active: for truth checks
Replace with Kind.is_active
(E712)
backend/app/api/endpoints/adapter/teams.py
40-40: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
41-41: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: e2e-tests
🔇 Additional comments (17)
backend/app/schemas/sse.py (1)
119-145: LGTM!The
TeamParameter,TeamParametersResponse, andTeamWithWorkflowmodels are well-structured with appropriate type hints and default values.backend/app/services/adapters/team_kinds.py (1)
160-179: LGTM!The updated signature and docstring properly document the new
workflow_enabledparameter.backend/app/api/endpoints/adapter/teams.py (1)
33-62: LGTM!The
workflow_enabledquery parameter is properly added and threaded through to bothget_user_teamsandcount_user_teamsservice calls. The docstring update accurately describes the filtering behavior.Note: The static analysis hints about
Depends()in argument defaults are false positives - this is the standard FastAPI dependency injection pattern.backend/app/api/endpoints/adapter/tasks.py (1)
184-206: LGTM!The
team_idandtask_typequery parameters are properly added and passed through to the service layer. The docstring is appropriately updated.backend/app/services/adapters/task_kinds.py (2)
286-293: LGTM on task_type SQL filter.Good implementation using
JSON_EXTRACTwithbindparams()for safe parameterization. This filter is correctly applied at the database level before pagination.
256-277: LGTM!The updated function signature and docstring properly document the new
team_idandtask_typeparameters.backend/app/services/task_streaming.py (3)
42-46: LGTM!The class initialization is straightforward. The
active_streamsdictionary provides a simple mechanism to track and control active streaming sessions.
280-282: LGTM!The
stop_streammethod correctly signals the streaming loop to terminate by setting the flag toFalse.
285-286: LGTM!The global service instance pattern is appropriate here for sharing state across request handlers.
backend/tests/schemas/test_sse.py (8)
24-34: LGTM!Comprehensive coverage of all SSE event type enum values.
37-51: LGTM!Test properly validates the
WorkflowStartedDataschema construction and field values.
54-83: LGTM!Good test coverage including both full field specification and optional field defaults.
86-114: LGTM!Tests cover both success and failure scenarios with appropriate field assertions.
117-134: LGTM!Test validates the
WorkflowFinishedDataschema with all fields populated. Consider adding tests forfailedandcancelledstatuses in future iterations.
137-178: LGTM!Thorough tests covering SSE event creation, format serialization, and error message handling.
181-220: LGTM!Comprehensive tests for both minimal and full field specifications with proper default value validation.
223-279: LGTM!Good coverage of
TeamParameterwith different types (string, select) andTeamParametersResponsewith both empty and populated states.
| @router.post("/stream") | ||
| async def create_and_stream_task( | ||
| stream_request: StreamTaskCreate, | ||
| current_user: User = Depends(security.get_current_user), | ||
| db: Session = Depends(get_db), | ||
| ): | ||
| """ | ||
| Create and execute a task with SSE streaming response. | ||
| Returns a Server-Sent Events stream with real-time task execution progress. | ||
| Compatible with Dify Workflow SSE event format. | ||
| Event types: | ||
| - workflow_started: Task execution started | ||
| - node_started: Bot/Subtask started execution | ||
| - node_finished: Bot/Subtask completed | ||
| - workflow_finished: Task execution completed | ||
| - error: Execution error occurred | ||
| - ping: Keep-alive ping | ||
| """ | ||
| # Convert StreamTaskCreate to TaskCreate | ||
| task_create = TaskCreate( | ||
| team_id=stream_request.team_id, | ||
| team_name=stream_request.team_name, | ||
| team_namespace=stream_request.team_namespace, | ||
| prompt=stream_request.prompt, | ||
| title=stream_request.title, | ||
| type=stream_request.type, | ||
| task_type=stream_request.task_type, | ||
| model_id=stream_request.model_id, | ||
| force_override_bot_model=stream_request.force_override_bot_model, | ||
| git_url=stream_request.git_url, | ||
| git_repo=stream_request.git_repo, | ||
| git_repo_id=stream_request.git_repo_id, | ||
| git_domain=stream_request.git_domain, | ||
| branch_name=stream_request.branch_name, | ||
| ) | ||
|
|
||
| # Create the task | ||
| task_result = task_kinds_service.create_task_or_append( | ||
| db=db, obj_in=task_create, user=current_user, task_id=None | ||
| ) | ||
| task_id = task_result["id"] | ||
|
|
||
| logger.info(f"Created streaming task {task_id} for user {current_user.id}") | ||
|
|
||
| # Return SSE streaming response | ||
| return StreamingResponse( | ||
| task_streaming_service.stream_task_execution( | ||
| db=db, | ||
| task_id=task_id, | ||
| user_id=current_user.id, | ||
| ), | ||
| media_type="text/event-stream", | ||
| headers={ | ||
| "Cache-Control": "no-cache", | ||
| "Connection": "keep-alive", | ||
| "X-Accel-Buffering": "no", # Disable nginx buffering | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential database session lifecycle issue in streaming endpoint.
The db session from Depends(get_db) is passed to task_streaming_service.stream_task_execution(), which is an async generator. FastAPI's dependency system will close the session when the request completes, but StreamingResponse continues yielding after the endpoint returns.
This could cause DetachedInstanceError or session-closed errors during streaming.
Verify that task_streaming_service.stream_task_execution creates its own database sessions internally for each poll iteration, rather than relying on the passed session:
#!/bin/bash
# Check how the streaming service handles database sessions
rg -n -A 20 "async def stream_task_execution" backend/app/services/task_streaming.py
# Check if it creates new sessions internally
rg -n "SessionLocal\|get_db" backend/app/services/task_streaming.py🧰 Tools
🪛 Ruff (0.14.6)
77-77: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
78-78: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
🤖 Prompt for AI Agents
In backend/app/api/endpoints/adapter/tasks.py around lines 74-133, the endpoint
passes the FastAPI dependency DB session (db) into an async generator
(stream_task_execution) which will keep yielding after the request handler
returns, risking use-after-close errors; change this so the streaming service
does not rely on the dependency-scoped session: stop passing the dependency
session into stream_task_execution (pass task_id and user_id only or pass a
session factory), and update task_streaming_service.stream_task_execution to
open and close its own DB sessions (or call get_db / SessionLocal inside each
poll/iteration) so every DB access uses a fresh session that is closed properly.
Ensure the endpoint signature and call are updated accordingly and remove any
reliance on the passed-in dependency session inside the generator.
| @router.get("/stream/{task_id}") | ||
| async def stream_existing_task( | ||
| task_id: int, | ||
| current_user: User = Depends(security.get_current_user), | ||
| db: Session = Depends(get_db), | ||
| ): | ||
| """ | ||
| Stream execution progress for an existing task. | ||
| Useful for reconnecting to a task stream after disconnection. | ||
| """ | ||
| # Verify task exists and belongs to user | ||
| task = task_kinds_service.get_task_by_id( | ||
| db=db, task_id=task_id, user_id=current_user.id | ||
| ) | ||
| if not task: | ||
| raise HTTPException(status_code=404, detail="Task not found") | ||
|
|
||
| logger.info(f"Streaming existing task {task_id} for user {current_user.id}") | ||
|
|
||
| return StreamingResponse( | ||
| task_streaming_service.stream_task_execution( | ||
| db=db, | ||
| task_id=task_id, | ||
| user_id=current_user.id, | ||
| ), | ||
| media_type="text/event-stream", | ||
| headers={ | ||
| "Cache-Control": "no-cache", | ||
| "Connection": "keep-alive", | ||
| "X-Accel-Buffering": "no", | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same database session lifecycle concern.
This endpoint has the same potential issue with the db session being passed to the async generator. The initial task verification (lines 148-152) will work since it runs before the response, but subsequent database access during streaming may fail.
See previous comment about verifying session handling in task_streaming_service.
🧰 Tools
🪛 Ruff (0.14.6)
139-139: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
140-140: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
|
|
||
| # Filter by team_id if provided | ||
| if team_id is not None: | ||
| task_team_id = task_related_data.get("team_id") | ||
| if task_team_id != team_id: | ||
| continue | ||
|
|
||
| result.append( | ||
| self._convert_to_task_dict_optimized(task, task_related_data, task_crd) | ||
| ) | ||
|
|
||
| # Adjust total count if filtered by team_id (need to recount) | ||
| if team_id is not None: | ||
| total = len(result) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect pagination when team_id filter is applied.
The team_id filter is applied after offset(skip).limit(limit), resulting in:
- Potentially fewer items than
limitreturned even when more matching tasks exist - Incorrect
totalcount (line 322 returnslen(result)which is the filtered subset, not the true total)
For example: if page 1 has 10 tasks but only 2 match team_id, the response would incorrectly show total: 2 instead of the actual count across all pages.
Consider applying the team_id filter at the SQL level:
# Build base filters
filters = [
Kind.user_id == user_id,
Kind.kind == "Task",
Kind.is_active == True,
text("JSON_EXTRACT(json, '$.status.status') != 'DELETE'"),
]
# Add task_type filter if provided
if task_type:
filters.append(
text(
"JSON_EXTRACT(json, '$.metadata.labels.taskType') = :task_type"
).bindparams(task_type=task_type)
)
+ # Note: team_id filter requires joining with teams table or subquery
+ # since team reference is stored by name/namespace, not IDAlternatively, if filtering by team_id at SQL level is complex, fetch without limit when team_id is provided (similar to the workflow_enabled approach).
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In backend/app/services/adapters/task_kinds.py around lines 309 to 323, the
team_id filter is applied after offset/limit which yields incorrect paging and
total; modify the data retrieval so team_id is applied at the DB level (add the
team_id filter to the query/where clause and use a proper count() query to
populate total) or, if adding the SQL filter is impractical, fetch all matching
rows when team_id is provided (no offset/limit), apply the team_id filter in
Python, compute total as the count of all filtered rows, then apply offset/limit
to build the paged result; ensure total always represents the full number of
matching tasks (use the DB count when possible).
| # Check if team has external API bots (workflow enabled) | ||
| team_has_workflow = self._check_team_has_external_api_bots( | ||
| db, temp_team, team_data.context_user_id | ||
| ) | ||
| team_dict["workflow_enabled"] = team_has_workflow | ||
| team_dict["has_parameters"] = team_has_workflow # Same as workflow_enabled | ||
|
|
||
| # Apply workflow_enabled filter if specified | ||
| if workflow_enabled is not None: | ||
| if workflow_enabled and not team_has_workflow: | ||
| continue | ||
| elif not workflow_enabled and team_has_workflow: | ||
| continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect pagination when workflow_enabled filter is applied.
The workflow_enabled filter is applied after pagination (skip/limit), which causes incorrect results. For example, if skip=0, limit=10 and the first 10 teams have workflow_enabled=False, a filter for workflow_enabled=True would return 0 items even if matching teams exist beyond the first 10.
Consider filtering at the query level or fetching all teams when workflow_enabled is specified:
+ # If workflow_enabled filter is applied, we need to check all teams
+ # and apply pagination after filtering
+ if workflow_enabled is not None:
+ # Remove pagination from initial query
+ final_query = (
+ db.query(
+ combined_query.c.team_id,
+ # ... other columns
+ )
+ .order_by(combined_query.c.team_created_at.desc())
+ # No offset/limit here
+ )
+ # ... process all teams, filter, then apply pagination to resultAlternatively, if shell type can be determined via SQL/JSON, push the filter down to the database query.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In backend/app/services/adapters/team_kinds.py around lines 276 to 288, the
workflow_enabled filter is applied after pagination which yields incorrect
results; change the logic so teams are filtered for workflow_enabled before
skip/limit is applied: either add the workflow_enabled predicate to the DB query
(preferred) by pushing the check into SQL/JSON (detect shell/type or external
API bot flag in the query) or, if that’s not possible, fetch the unpaginated
team list, apply the workflow_enabled filter in memory, then apply skip/limit to
the filtered list so pagination reflects the filter.
Summary
POST /api/tasks/streamendpoint for creating and streaming task execution with SSEGET /api/tasks/stream/{task_id}endpoint for streaming existing tasksworkflow_started,node_started,node_finished,workflow_finished,error,pingteam_idandtask_typefilters toGET /api/tasksendpointworkflow_enabledfilter toGET /api/teamsendpointworkflow_enabledandhas_parametersfields to team list responseTest plan
Summary by CodeRabbit
Release Notes
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.