Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
c7a944a
chore: update submodules
andhreljaKern Oct 24, 2025
f63a8ba
perf(alembic): add etl task table
andhreljaKern Oct 24, 2025
7b5fcbe
chore: update submodules
andhreljaKern Oct 24, 2025
4f57597
perf(alembic): update etl task table
andhreljaKern Oct 24, 2025
aa9f193
chore: update submodules
andhreljaKern Oct 24, 2025
c939fac
perf(alembic): add org_id column
andhreljaKern Oct 24, 2025
4d25841
chore: update submodules
andhreljaKern Oct 24, 2025
217832c
perf(alembic): update etl task table
andhreljaKern Oct 24, 2025
c218594
chore: update submodules
andhreljaKern Oct 26, 2025
cb64bf8
perf: add file_size_bytes to etl_task
andhreljaKern Oct 26, 2025
bd1b633
chore: update submodules
andhreljaKern Oct 27, 2025
92e89cd
perf: add split_config
andhreljaKern Oct 27, 2025
5b18a3c
chore: update submodules
andhreljaKern Oct 27, 2025
97b01bc
perf(etl): fkey alignment
andhreljaKern Oct 27, 2025
7a40c81
chore: update submodules
andhreljaKern Oct 28, 2025
79b05b1
perf: task cancellation
andhreljaKern Oct 28, 2025
2b4ad4e
chore: update submodules
andhreljaKern Oct 29, 2025
6264d28
fix: update submodules merge conflict
andhreljaKern Oct 30, 2025
1aff7a8
perf: add cache_config
andhreljaKern Oct 30, 2025
e3a19db
perf: align /notify to etl provider
andhreljaKern Oct 30, 2025
94af481
chore: update submodules
andhreljaKern Oct 30, 2025
fb71eb2
chore: update submodules
andhreljaKern Oct 30, 2025
0b7c896
perf: update minio_upload for execute_etl
andhreljaKern Oct 30, 2025
5e0aafe
fix: markdown_file update after etl_task creation
andhreljaKern Oct 30, 2025
709d22d
chore: merge dev
andhreljaKern Oct 30, 2025
b07fc7d
perf: add etl task table
andhreljaKern Oct 30, 2025
41c9573
perf: disable CLEANSE as default
andhreljaKern Oct 30, 2025
2554a43
chore: update submodules
andhreljaKern Oct 30, 2025
52f715e
perf: standard cache config keys
andhreljaKern Oct 30, 2025
e6ec935
Merge remote-tracking branch 'origin/dev' into cognition-etl-provider
JWittmeyer Nov 3, 2025
356f38e
Merge with dev
JWittmeyer Nov 3, 2025
2770d25
Alembic new table & submodule fix import
JWittmeyer Nov 3, 2025
49b20d4
Merge with dev
JWittmeyer Nov 12, 2025
b70404e
tmp commit
JWittmeyer Nov 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions alembic/versions/9d5fb67e29f7_config_sets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Config sets'


Revision ID: 9d5fb67e29f7
Revises: f428a22ecdb3
Create Date: 2025-11-03 15:28:47.686657

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '9d5fb67e29f7'
down_revision = 'f428a22ecdb3'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('etl_config_preset',
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('project_id', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('name', sa.String(), nullable=True),
sa.Column('description', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('created_by', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('etl_config', sa.JSON(), nullable=True),
sa.Column('add_config', sa.JSON(), nullable=True),
sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='SET NULL'),
sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['project_id'], ['cognition.project.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name'),
schema='cognition'
)
op.create_index(op.f('ix_cognition_etl_config_preset_created_by'), 'etl_config_preset', ['created_by'], unique=False, schema='cognition')
op.create_index(op.f('ix_cognition_etl_config_preset_organization_id'), 'etl_config_preset', ['organization_id'], unique=False, schema='cognition')
op.create_index(op.f('ix_cognition_etl_config_preset_project_id'), 'etl_config_preset', ['project_id'], unique=False, schema='cognition')
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_cognition_etl_config_preset_project_id'), table_name='etl_config_preset', schema='cognition')
op.drop_index(op.f('ix_cognition_etl_config_preset_organization_id'), table_name='etl_config_preset', schema='cognition')
op.drop_index(op.f('ix_cognition_etl_config_preset_created_by'), table_name='etl_config_preset', schema='cognition')
op.drop_table('etl_config_preset', schema='cognition')
# ### end Alembic commands ###
318 changes: 318 additions & 0 deletions alembic/versions/f428a22ecdb3_adds_etl_task_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
"""adds etl task table

Revision ID: f428a22ecdb3
Revises: 85bb3ebee137
Create Date: 2025-10-30 10:45:20.843280

"""

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "f428a22ecdb3"
down_revision = "85bb3ebee137"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"etl_task",
sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False),
sa.Column("organization_id", postgresql.UUID(as_uuid=True), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=True),
sa.Column("created_by", postgresql.UUID(as_uuid=True), nullable=True),
sa.Column("file_path", sa.String(), nullable=True),
sa.Column("file_size_bytes", sa.BigInteger(), nullable=True),
sa.Column("tokenizer", sa.String(), nullable=True),
sa.Column("full_config", sa.JSON(), nullable=True),
sa.Column("started_at", sa.DateTime(), nullable=True),
sa.Column("finished_at", sa.DateTime(), nullable=True),
sa.Column("state", sa.String(), nullable=True),
sa.Column("is_active", sa.Boolean(), nullable=True),
sa.Column("priority", sa.Integer(), nullable=True),
sa.Column("error_message", sa.String(), nullable=True),
sa.ForeignKeyConstraint(["created_by"], ["user.id"], ondelete="SET NULL"),
sa.ForeignKeyConstraint(
["organization_id"], ["organization.id"], ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id"),
schema="global",
)
op.create_index(
op.f("ix_global_etl_task_created_by"),
"etl_task",
["created_by"],
unique=False,
schema="global",
)
op.create_index(
op.f("ix_global_etl_task_organization_id"),
"etl_task",
["organization_id"],
unique=False,
schema="global",
)
op.add_column(
"markdown_file",
sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True),
schema="cognition",
)
op.create_index(
op.f("ix_cognition_markdown_file_etl_task_id"),
"markdown_file",
["etl_task_id"],
unique=False,
schema="cognition",
)
op.create_unique_constraint(
"unique_markdown_file_etl_task_id",
"markdown_file",
["id", "etl_task_id"],
schema="cognition",
)
op.create_foreign_key(
None,
"markdown_file",
"etl_task",
["etl_task_id"],
["id"],
source_schema="cognition",
referent_schema="global",
ondelete="CASCADE",
)
op.add_column(
"github_file",
sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True),
schema="integration",
)
op.drop_constraint(
"unique_github_file_source", "github_file", schema="integration", type_="unique"
)
op.create_unique_constraint(
"unique_github_file_source",
"github_file",
["integration_id", "running_id", "source", "etl_task_id"],
schema="integration",
)
op.create_index(
op.f("ix_integration_github_file_etl_task_id"),
"github_file",
["etl_task_id"],
unique=False,
schema="integration",
)
op.create_foreign_key(
None,
"github_file",
"etl_task",
["etl_task_id"],
["id"],
source_schema="integration",
referent_schema="global",
ondelete="CASCADE",
)
op.add_column(
"github_issue",
sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True),
schema="integration",
)
op.drop_constraint(
"unique_github_issue_source",
"github_issue",
schema="integration",
type_="unique",
)
op.create_unique_constraint(
"unique_github_issue_source",
"github_issue",
["integration_id", "running_id", "source", "etl_task_id"],
schema="integration",
)
op.create_index(
op.f("ix_integration_github_issue_etl_task_id"),
"github_issue",
["etl_task_id"],
unique=False,
schema="integration",
)
op.create_foreign_key(
None,
"github_issue",
"etl_task",
["etl_task_id"],
["id"],
source_schema="integration",
referent_schema="global",
ondelete="CASCADE",
)
op.add_column(
"pdf",
sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True),
schema="integration",
)
op.drop_constraint("unique_pdf_source", "pdf", schema="integration", type_="unique")
op.create_unique_constraint(
"unique_pdf_source",
"pdf",
["integration_id", "running_id", "source", "etl_task_id"],
schema="integration",
)
op.create_index(
op.f("ix_integration_pdf_etl_task_id"),
"pdf",
["etl_task_id"],
unique=False,
schema="integration",
)
op.create_foreign_key(
None,
"pdf",
"etl_task",
["etl_task_id"],
["id"],
source_schema="integration",
referent_schema="global",
ondelete="CASCADE",
)
op.add_column(
"sharepoint",
sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True),
schema="integration",
)
op.drop_constraint(
"unique_sharepoint_source", "sharepoint", schema="integration", type_="unique"
)
op.create_unique_constraint(
"unique_sharepoint_source",
"sharepoint",
["integration_id", "running_id", "source", "etl_task_id"],
schema="integration",
)
op.create_index(
op.f("ix_integration_sharepoint_etl_task_id"),
"sharepoint",
["etl_task_id"],
unique=False,
schema="integration",
)
op.create_foreign_key(
None,
"sharepoint",
"etl_task",
["etl_task_id"],
["id"],
source_schema="integration",
referent_schema="global",
ondelete="CASCADE",
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"sharepoint_etl_task_id_fkey",
"sharepoint",
schema="integration",
type_="foreignkey",
)
op.drop_index(
op.f("ix_integration_sharepoint_etl_task_id"),
table_name="sharepoint",
schema="integration",
)
op.drop_constraint(
"unique_sharepoint_source", "sharepoint", schema="integration", type_="unique"
)
op.create_unique_constraint(
"unique_sharepoint_source",
"sharepoint",
["integration_id", "running_id", "source"],
schema="integration",
)
op.drop_column("sharepoint", "etl_task_id", schema="integration")
op.drop_constraint(
"pdf_etl_task_id_fkey", "pdf", schema="integration", type_="foreignkey"
)
op.drop_index(
op.f("ix_integration_pdf_etl_task_id"), table_name="pdf", schema="integration"
)
op.drop_constraint("unique_pdf_source", "pdf", schema="integration", type_="unique")
op.create_unique_constraint(
"unique_pdf_source",
"pdf",
["integration_id", "running_id", "source"],
schema="integration",
)
op.drop_column("pdf", "etl_task_id", schema="integration")
op.drop_constraint(
"github_issue_etl_task_id_fkey",
"github_issue",
schema="integration",
type_="foreignkey",
)
op.drop_index(
op.f("ix_integration_github_issue_etl_task_id"),
table_name="github_issue",
schema="integration",
)
op.drop_constraint(
"unique_github_issue_source",
"github_issue",
schema="integration",
type_="unique",
)
op.create_unique_constraint(
"unique_github_issue_source",
"github_issue",
["integration_id", "running_id", "source"],
schema="integration",
)
op.drop_column("github_issue", "etl_task_id", schema="integration")
op.drop_constraint(
"github_file_etl_task_id_fkey",
"github_file",
schema="integration",
type_="foreignkey",
)
op.drop_index(
op.f("ix_integration_github_file_etl_task_id"),
table_name="github_file",
schema="integration",
)
op.drop_constraint(
"unique_github_file_source", "github_file", schema="integration", type_="unique"
)
op.create_unique_constraint(
"unique_github_file_source",
"github_file",
["integration_id", "running_id", "source"],
schema="integration",
)
op.drop_column("github_file", "etl_task_id", schema="integration")
op.drop_constraint(
"markdown_file_etl_task_id_fkey",
"markdown_file",
schema="cognition",
type_="foreignkey",
)
op.drop_index(
op.f("ix_cognition_markdown_file_etl_task_id"),
table_name="markdown_file",
schema="cognition",
)
op.drop_column("markdown_file", "etl_task_id", schema="cognition")
op.drop_index(
op.f("ix_global_etl_task_organization_id"),
table_name="etl_task",
schema="global",
)
op.drop_index(
op.f("ix_global_etl_task_created_by"), table_name="etl_task", schema="global"
)
op.drop_table("etl_task", schema="global")
# ### end Alembic commands ###
11 changes: 11 additions & 0 deletions controller/monitor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,14 @@ def cancel_integration_task(
task_monitor.set_integration_task_to_failed(
integration_id, error_message="Cancelled by task manager"
)


def cancel_etl_task(
task_info: Dict[str, Any],
) -> None:

etl_task_id = task_info.get("etlTaskId")

task_monitor.set_etl_task_to_failed(
etl_task_id, error_message="Cancelled by task manager"
)
Loading