From 3d4317aaea9cfd2d8f241e87de67cc39fe98f55a Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 12:51:11 +0200 Subject: [PATCH 01/36] perf: add etl task table --- enums.py | 6 ++++++ models.py | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/enums.py b/enums.py index 079a6bc..30bb7d0 100644 --- a/enums.py +++ b/enums.py @@ -177,6 +177,7 @@ class Tablenames(Enum): SUMS_TABLE = "sums_table" ADMIN_QUERY_MESSAGE_SUMMARY = "admin_query_message_summary" RELEASE_NOTIFICATION = "release_notification" + ETL_TASK = "etl_task" def snake_case_to_pascal_case(self): # the type name (written in PascalCase) of a table is needed to create backrefs @@ -1012,3 +1013,8 @@ class MessageInitiationType(Enum): UI = "UI" API = "API" MACRO = "MACRO" + + +class ETLFileSplitType(Enum): + SECTION = "SECTION" + SHRINK = "SHRINK" diff --git a/models.py b/models.py index 5932911..732cf36 100644 --- a/models.py +++ b/models.py @@ -2504,3 +2504,48 @@ class ReleaseNotification(Base): ) link = Column(String, nullable=False) config = Column(JSON) # e.g. {"en": {"headline":"", "description":""}, "de": {...}} + + +class EtlTaskQueue(Base): + __tablename__ = Tablenames.ETL_TASK.value + __table_args__ = {"schema": "global"} + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + created_at = Column(DateTime, default=sql.func.now()) + created_by = Column( + UUID(as_uuid=True), + ForeignKey(f"{Tablenames.USER.value}.id", ondelete="SET NULL"), + index=True, + ) + markdown_file_id = Column( + UUID(as_uuid=True), + ForeignKey( + f"cognition.{Tablenames.MARKDOWN_FILE.value}.id", ondelete="CASCADE" + ), + index=True, + nullable=True, + ) + sharepoint_file_id = Column( + UUID(as_uuid=True), + ForeignKey( + f"integration.{Tablenames.INTEGRATION_SHAREPOINT.value}.id", + ondelete="CASCADE", + ), + index=True, + nullable=True, + ) + + extract_config = Column(JSON) # schema depends on the file type + transform_config = Column( + JSON + ) # {"split_strategy": {"type": enums.ETLFileSplitType}, "summarize": "true", "cleanse": true, "text-to-table": true} + load_config = Column(JSON) # {"refinery_project": false, "markdown_file": true} + notify_config = Column( + JSON + ) # {"http": {"url": "http://cognition-gateway:80/etl/complete/{task_id}", "method": "POST"}} + + priority = Column(Boolean, default=False) + is_active = Column(Boolean, default=False) + started_at = Column(DateTime) + finished_at = Column(DateTime) + state = Column(String) # of type enums.CognitionMarkdownFileState + error_message = Column(String) From c002a6e2d612517d88749e967691e38ab82aa7d9 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 13:03:34 +0200 Subject: [PATCH 02/36] perf(etl_task): add llm_config --- models.py | 1 + 1 file changed, 1 insertion(+) diff --git a/models.py b/models.py index 732cf36..3fa106e 100644 --- a/models.py +++ b/models.py @@ -2542,6 +2542,7 @@ class EtlTaskQueue(Base): notify_config = Column( JSON ) # {"http": {"url": "http://cognition-gateway:80/etl/complete/{task_id}", "method": "POST"}} + llm_config = Column(JSON) priority = Column(Boolean, default=False) is_active = Column(Boolean, default=False) From c12e193dde22f860f0e47f04e91878e858308183 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 13:10:47 +0200 Subject: [PATCH 03/36] perf: rename EtlTask class --- models.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/models.py b/models.py index 3fa106e..35dabc4 100644 --- a/models.py +++ b/models.py @@ -2506,10 +2506,15 @@ class ReleaseNotification(Base): config = Column(JSON) # e.g. {"en": {"headline":"", "description":""}, "de": {...}} -class EtlTaskQueue(Base): +class EtlTask(Base): __tablename__ = Tablenames.ETL_TASK.value __table_args__ = {"schema": "global"} id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + organization_id = Column( + UUID(as_uuid=True), + ForeignKey(f"{Tablenames.ORGANIZATION.value}.id", ondelete="CASCADE"), + index=True, + ) created_at = Column(DateTime, default=sql.func.now()) created_by = Column( UUID(as_uuid=True), @@ -2544,9 +2549,9 @@ class EtlTaskQueue(Base): ) # {"http": {"url": "http://cognition-gateway:80/etl/complete/{task_id}", "method": "POST"}} llm_config = Column(JSON) - priority = Column(Boolean, default=False) - is_active = Column(Boolean, default=False) started_at = Column(DateTime) finished_at = Column(DateTime) state = Column(String) # of type enums.CognitionMarkdownFileState + is_active = Column(Boolean, default=False) + priority = Column(Integer, default=0) error_message = Column(String) From 571073f8770c9618cae12b9dbf67417424c98ee9 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 13:44:51 +0200 Subject: [PATCH 04/36] perf: add business objects --- enums.py | 1 + global_objects/etl_task.py | 187 +++++++++++++++++++++++++++++++++++++ models.py | 7 +- 3 files changed, 193 insertions(+), 2 deletions(-) create mode 100644 global_objects/etl_task.py diff --git a/enums.py b/enums.py index 30bb7d0..fdc13cb 100644 --- a/enums.py +++ b/enums.py @@ -517,6 +517,7 @@ class TaskType(Enum): RUN_COGNITION_MACRO = "RUN_COGNITION_MACRO" PARSE_COGNITION_FILE = "PARSE_COGNITION_FILE" EXECUTE_INTEGRATION = "EXECUTE_INTEGRATION" + EXECUTE_ETL = "EXECUTE_ETL" class TaskQueueAction(Enum): diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py new file mode 100644 index 0000000..ee2d45b --- /dev/null +++ b/global_objects/etl_task.py @@ -0,0 +1,187 @@ +from typing import List, Optional, Dict, Union +from sqlalchemy.orm.attributes import flag_modified + +import datetime + +from ..business_objects import general +from ..session import session +from ..models import EtlTask +from ..enums import CognitionMarkdownFileState + +FINISHED_STATES = [ + CognitionMarkdownFileState.FINISHED.value, + CognitionMarkdownFileState.FAILED.value, +] + + +def get_by_ids(ids: List[str]) -> List[EtlTask]: + return session.query(EtlTask).filter(EtlTask.id.in_(ids)).all() + + +def get_by_id(id: str) -> EtlTask: + return session.query(EtlTask).filter(EtlTask.id == id).first() + + +def get_all( + markdown_file_id: Optional[str] = None, + sharepoint_file_id: Optional[str] = None, + exclude_failed: Optional[bool] = False, + only_active: Optional[bool] = False, +) -> List[EtlTask]: + query = session.query(EtlTask) + if markdown_file_id is not None and sharepoint_file_id is not None: + raise ValueError( + "get_all: Only one of markdown_file_id or sharepoint_file_id should be provided." + ) + if markdown_file_id: + query = query.filter(EtlTask.markdown_file_id == markdown_file_id) + if sharepoint_file_id: + query = query.filter(EtlTask.sharepoint_file_id == sharepoint_file_id) + + if exclude_failed: + query = query.filter(EtlTask.state != CognitionMarkdownFileState.FAILED.value) + if only_active: + query = query.filter(EtlTask.is_active == True) + return query.order_by(EtlTask.created_at.desc()).all() + + +def get_all_in_org( + org_id: str, + exclude_failed: Optional[bool] = False, + only_active: Optional[bool] = False, +) -> List[EtlTask]: + query = session.query(EtlTask).filter(EtlTask.organization_id == org_id) + if only_active: + query = query.filter(EtlTask.is_active == True) + if exclude_failed: + query = query.filter(EtlTask.state != CognitionMarkdownFileState.FAILED.value) + return query.order_by(EtlTask.created_at.desc()).all() + + +def get_all_in_org_paginated( + org_id: str, + page: int = 1, + page_size: int = 10, +) -> List[EtlTask]: + query = session.query(EtlTask).filter( + EtlTask.organization_id == org_id, + ) + + return ( + query.order_by(EtlTask.created_at.desc()) + .limit(page_size) + .offset(max(0, (page - 1) * page_size)) + .all() + ) + + +def create( + org_id: str, + user_id: str, + markdown_file_id: str, + sharepoint_file_id: str, + extract_config: Dict, + transform_config: Dict, + load_config: Dict, + notify_config: Dict, + llm_config: Dict, + id: Optional[str] = None, + with_commit: bool = True, +) -> EtlTask: + etl_task: EtlTask = EtlTask( + id=id, + organization_id=org_id, + created_by=user_id, + markdown_file_id=markdown_file_id, + sharepoint_file_id=sharepoint_file_id, + extract_config=extract_config, + transform_config=transform_config, + load_config=load_config, + notify_config=notify_config, + llm_config=llm_config, + ) + general.add(etl_task, with_commit) + + return etl_task + + +def update( + id: str, + updated_by: Optional[str] = None, + extract_config: Optional[Dict] = None, + transform_config: Optional[Dict] = None, + load_config: Optional[Dict] = None, + notify_config: Optional[Dict] = None, + llm_config: Optional[Dict] = None, + started_at: Optional[datetime.datetime] = None, + finished_at: Optional[Union[str, datetime.datetime]] = None, + state: Optional[CognitionMarkdownFileState] = None, + is_active: Optional[bool] = None, + priority: Optional[int] = None, + error_message: Optional[str] = None, + with_commit: bool = True, +) -> Optional[EtlTask]: + etl_task: EtlTask = get_by_id(id) + if not etl_task: + return None + + if updated_by is not None: + etl_task.updated_by = updated_by + if extract_config is not None: + etl_task.extract_config = extract_config + flag_modified(etl_task, "config") + if transform_config is not None: + etl_task.transform_config = transform_config + flag_modified(etl_task, "transform_config") + if load_config is not None: + etl_task.load_config = load_config + flag_modified(etl_task, "load_config") + if notify_config is not None: + etl_task.notify_config = notify_config + flag_modified(etl_task, "notify_config") + if llm_config is not None: + etl_task.llm_config = llm_config + flag_modified(etl_task, "llm_config") + if started_at is not None: + etl_task.started_at = started_at + if finished_at is not None: + if finished_at == "NULL": + etl_task.finished_at = None + else: + etl_task.finished_at = finished_at + if state is not None: + etl_task.state = state.value + if is_active is not None: + etl_task.is_active = is_active + if priority is not None: + etl_task.priority = priority + if error_message is not None: + if error_message == "NULL": + etl_task.error_message = None + else: + etl_task.error_message = error_message + + general.add(etl_task, with_commit) + return etl_task + + +def execution_finished(id: str) -> bool: + if not get_by_id(id): + return True + return bool( + session.query(EtlTask) + .filter( + EtlTask.id == id, + EtlTask.state.in_(FINISHED_STATES), + ) + .first() + ) + + +def delete_many(ids: List[str], with_commit: bool = True) -> None: + ( + session.query(EtlTask) + .filter(EtlTask.id.in_(ids)) + .delete(synchronize_session=False) + ) + general.flush_or_commit(with_commit) diff --git a/models.py b/models.py index 35dabc4..eb7bfbd 100644 --- a/models.py +++ b/models.py @@ -20,6 +20,7 @@ TokenSubject, UploadStates, UserRoles, + CognitionMarkdownFileState, ) from sqlalchemy import ( BigInteger, @@ -2542,7 +2543,7 @@ class EtlTask(Base): extract_config = Column(JSON) # schema depends on the file type transform_config = Column( JSON - ) # {"split_strategy": {"type": enums.ETLFileSplitType}, "summarize": "true", "cleanse": true, "text-to-table": true} + ) # {"split_strategy": {"type": enums.ETLFileSplitType}, "summarize": true, "cleanse": true, "text-to-table": true} load_config = Column(JSON) # {"refinery_project": false, "markdown_file": true} notify_config = Column( JSON @@ -2551,7 +2552,9 @@ class EtlTask(Base): started_at = Column(DateTime) finished_at = Column(DateTime) - state = Column(String) # of type enums.CognitionMarkdownFileState + state = Column( + String, default=CognitionMarkdownFileState.QUEUE.value + ) # of type enums.CognitionMarkdownFileState is_active = Column(Boolean, default=False) priority = Column(Integer, default=0) error_message = Column(String) From 6232a627eb92a6750eb8e8eb6e83f2cafc7f2537 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 15:29:26 +0200 Subject: [PATCH 05/36] fix: optional file ids in etl task create --- global_objects/etl_task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index ee2d45b..6920b0d 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -78,13 +78,13 @@ def get_all_in_org_paginated( def create( org_id: str, user_id: str, - markdown_file_id: str, - sharepoint_file_id: str, extract_config: Dict, transform_config: Dict, load_config: Dict, notify_config: Dict, llm_config: Dict, + markdown_file_id: Optional[str] = None, + sharepoint_file_id: Optional[str] = None, id: Optional[str] = None, with_commit: bool = True, ) -> EtlTask: From 2e7c924923890f15ff45ef294d0efcb05db0e2d3 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 15:37:10 +0200 Subject: [PATCH 06/36] perf: add file_path to etl_task --- global_objects/etl_task.py | 2 ++ models.py | 1 + 2 files changed, 3 insertions(+) diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 6920b0d..2284236 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -78,6 +78,7 @@ def get_all_in_org_paginated( def create( org_id: str, user_id: str, + file_path: str, extract_config: Dict, transform_config: Dict, load_config: Dict, @@ -94,6 +95,7 @@ def create( created_by=user_id, markdown_file_id=markdown_file_id, sharepoint_file_id=sharepoint_file_id, + file_path=file_path, extract_config=extract_config, transform_config=transform_config, load_config=load_config, diff --git a/models.py b/models.py index eb7bfbd..f949a02 100644 --- a/models.py +++ b/models.py @@ -2539,6 +2539,7 @@ class EtlTask(Base): index=True, nullable=True, ) + file_path = Column(String) extract_config = Column(JSON) # schema depends on the file type transform_config = Column( From d58550a746d312168f7b02a6965d3d2b36a95aad Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Sat, 25 Oct 2025 01:03:54 +0200 Subject: [PATCH 07/36] perf: file paths --- enums.py | 6 ++++++ global_objects/etl_task.py | 18 +++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/enums.py b/enums.py index fdc13cb..2d60e72 100644 --- a/enums.py +++ b/enums.py @@ -1019,3 +1019,9 @@ class MessageInitiationType(Enum): class ETLFileSplitType(Enum): SECTION = "SECTION" SHRINK = "SHRINK" + + +class ETLFileType(Enum): + PDF = "PDF" + WORD = "WORD" + MD = "MD" diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 2284236..84fdf7b 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -78,12 +78,12 @@ def get_all_in_org_paginated( def create( org_id: str, user_id: str, - file_path: str, extract_config: Dict, transform_config: Dict, load_config: Dict, notify_config: Dict, llm_config: Dict, + file_path: Optional[str] = None, markdown_file_id: Optional[str] = None, sharepoint_file_id: Optional[str] = None, id: Optional[str] = None, @@ -108,8 +108,10 @@ def create( def update( - id: str, + id: Optional[str] = None, + etl_task: Optional[EtlTask] = None, updated_by: Optional[str] = None, + file_path: Optional[str] = None, extract_config: Optional[Dict] = None, transform_config: Optional[Dict] = None, load_config: Optional[Dict] = None, @@ -123,12 +125,22 @@ def update( error_message: Optional[str] = None, with_commit: bool = True, ) -> Optional[EtlTask]: - etl_task: EtlTask = get_by_id(id) + if not id and not etl_task: + return None + if id: + etl_task: EtlTask = get_by_id(id) if not etl_task: return None if updated_by is not None: etl_task.updated_by = updated_by + if file_path is not None and etl_task.file_path is None: + etl_task.file_path = file_path + else: + print( + "WARNING: ETL Task file_path update attempted but file_path is already set", + flush=True, + ) if extract_config is not None: etl_task.extract_config = extract_config flag_modified(etl_task, "config") From cce9ba4d01602dcccbf622c8622f3e2dcbb85fba Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Sat, 25 Oct 2025 20:45:23 +0200 Subject: [PATCH 08/36] perf: add tokenizer column --- global_objects/etl_task.py | 2 ++ models.py | 1 + 2 files changed, 3 insertions(+) diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 84fdf7b..e73a97b 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -78,6 +78,7 @@ def get_all_in_org_paginated( def create( org_id: str, user_id: str, + tokenizer: str, extract_config: Dict, transform_config: Dict, load_config: Dict, @@ -96,6 +97,7 @@ def create( markdown_file_id=markdown_file_id, sharepoint_file_id=sharepoint_file_id, file_path=file_path, + tokenizer=tokenizer, extract_config=extract_config, transform_config=transform_config, load_config=load_config, diff --git a/models.py b/models.py index f949a02..b7f060d 100644 --- a/models.py +++ b/models.py @@ -2540,6 +2540,7 @@ class EtlTask(Base): nullable=True, ) file_path = Column(String) + tokenizer = Column(String) extract_config = Column(JSON) # schema depends on the file type transform_config = Column( From 2bea8576426ce2a305138e56d45874e46cfdd0b4 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Sat, 25 Oct 2025 22:12:35 +0200 Subject: [PATCH 09/36] perf: add file_size col --- cognition_objects/environment_variable.py | 16 ++++++++++++++++ global_objects/etl_task.py | 14 +++++++------- models.py | 1 + 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/cognition_objects/environment_variable.py b/cognition_objects/environment_variable.py index 68e8630..cb6787c 100644 --- a/cognition_objects/environment_variable.py +++ b/cognition_objects/environment_variable.py @@ -64,6 +64,22 @@ def get_by_name_and_org_id( ) +def get_by_id_and_org_id( + org_id: str, + id: str, +) -> CognitionEnvironmentVariable: + + return ( + session.query(CognitionEnvironmentVariable) + .filter( + CognitionEnvironmentVariable.organization_id == org_id, + CognitionEnvironmentVariable.project_id == None, + CognitionEnvironmentVariable.id == id, + ) + .first() + ) + + def get_dataset_env_var_value( dataset_id: str, org_id: str, scope: Literal["extraction", "transformation"] ) -> Union[str, None]: diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index e73a97b..d6c4bfa 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -78,12 +78,13 @@ def get_all_in_org_paginated( def create( org_id: str, user_id: str, - tokenizer: str, + file_size_bytes: int, extract_config: Dict, transform_config: Dict, load_config: Dict, notify_config: Dict, llm_config: Dict, + tokenizer: str, file_path: Optional[str] = None, markdown_file_id: Optional[str] = None, sharepoint_file_id: Optional[str] = None, @@ -97,12 +98,13 @@ def create( markdown_file_id=markdown_file_id, sharepoint_file_id=sharepoint_file_id, file_path=file_path, - tokenizer=tokenizer, + file_size_bytes=file_size_bytes, extract_config=extract_config, transform_config=transform_config, load_config=load_config, notify_config=notify_config, llm_config=llm_config, + tokenizer=tokenizer, ) general.add(etl_task, with_commit) @@ -114,6 +116,7 @@ def update( etl_task: Optional[EtlTask] = None, updated_by: Optional[str] = None, file_path: Optional[str] = None, + file_size_bytes: Optional[int] = None, extract_config: Optional[Dict] = None, transform_config: Optional[Dict] = None, load_config: Optional[Dict] = None, @@ -138,11 +141,8 @@ def update( etl_task.updated_by = updated_by if file_path is not None and etl_task.file_path is None: etl_task.file_path = file_path - else: - print( - "WARNING: ETL Task file_path update attempted but file_path is already set", - flush=True, - ) + if file_size_bytes is not None and etl_task.file_size_bytes is None: + etl_task.file_size_bytes = file_size_bytes if extract_config is not None: etl_task.extract_config = extract_config flag_modified(etl_task, "config") diff --git a/models.py b/models.py index b7f060d..1418737 100644 --- a/models.py +++ b/models.py @@ -2540,6 +2540,7 @@ class EtlTask(Base): nullable=True, ) file_path = Column(String) + file_size_bytes = Column(BigInteger) tokenizer = Column(String) extract_config = Column(JSON) # schema depends on the file type From e6d425b1d6a7da0eeaed016f19960763ba20ee98 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Sun, 26 Oct 2025 21:42:33 +0100 Subject: [PATCH 10/36] perf: add split config --- enums.py | 60 +++++++++++++++++++++++++++++++++++++++++-------------- models.py | 3 ++- 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/enums.py b/enums.py index 2d60e72..866f462 100644 --- a/enums.py +++ b/enums.py @@ -470,22 +470,18 @@ class TokenScope(Enum): READ = "READ" READ_WRITE = "READ_WRITE" - def all(): - return [ - TokenScope.READ.value, - TokenScope.READ_WRITE.value, - ] + @classmethod + def all(cls): + return [e.value for e in cls] class TokenSubject(Enum): PROJECT = Tablenames.PROJECT.value.upper() MARKDOWN_DATASET = Tablenames.MARKDOWN_DATASET.value.upper() - def all(): - return [ - TokenSubject.PROJECT.value, - TokenSubject.MARKDOWN_DATASET.value, - ] + @classmethod + def all(cls): + return [e.value for e in cls] class TokenizationTaskTypes(Enum): @@ -808,11 +804,9 @@ class MacroType(Enum): DOCUMENT_MESSAGE_QUEUE = "DOCUMENT_MESSAGE_QUEUE" FOLDER_MESSAGE_QUEUE = "FOLDER_MESSAGE_QUEUE" - def all(): - return [ - MacroType.DOCUMENT_MESSAGE_QUEUE.value, - MacroType.FOLDER_MESSAGE_QUEUE.value, - ] + @classmethod + def all(cls): + return [e.value for e in cls] # currently only one option, but could be extended in the future @@ -1025,3 +1019,39 @@ class ETLFileType(Enum): PDF = "PDF" WORD = "WORD" MD = "MD" + + +class ETLExtractorPDF(Enum): + VISION = "VISION" + AZURE_DI = "AZURE_DI" + PDF2MD = "PDF2MD" + + @classmethod + def all(cls): + return [e.value for e in cls] + + @staticmethod + def from_string(value: str): + changed_value = value.upper().replace(" ", "_").replace("-", "_") + if changed_value == "VISION": + return ETLExtractorPDF.VISION + elif changed_value == "AZURE_DI": + return ETLExtractorPDF.AZURE_DI + elif changed_value == "PDF2MD": + return ETLExtractorPDF.PDF2MD + raise ValueError("ERROR: Could not parse ETLExtractorPDF from string") + + +class ETLExtractorMD(Enum): + FILESYSTEM = "FILESYSTEM" + + @classmethod + def all(cls): + return [e.value for e in cls] + + @staticmethod + def from_string(value: str): + changed_value = value.upper().replace(" ", "_").replace("-", "_") + if changed_value == "FILESYSTEM": + return ETLExtractorMD.FILESYSTEM + raise ValueError("ERROR: Could not parse ETLExtractorMD from string") diff --git a/models.py b/models.py index 1418737..e71dcec 100644 --- a/models.py +++ b/models.py @@ -2544,9 +2544,10 @@ class EtlTask(Base): tokenizer = Column(String) extract_config = Column(JSON) # schema depends on the file type + split_config = Column(JSON) # {"chunk": true, "shrink": false} transform_config = Column( JSON - ) # {"split_strategy": {"type": enums.ETLFileSplitType}, "summarize": true, "cleanse": true, "text-to-table": true} + ) # {"summarize": true, "cleanse": true, "text_to_table": true} load_config = Column(JSON) # {"refinery_project": false, "markdown_file": true} notify_config = Column( JSON From ac40aa9c2b89603d28dacd1e009a9ad73b9e1032 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 27 Oct 2025 00:10:13 +0100 Subject: [PATCH 11/36] perf: add split config to etl task --- enums.py | 15 +++++++++++++++ global_objects/etl_task.py | 8 +++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/enums.py b/enums.py index 866f462..a742cd3 100644 --- a/enums.py +++ b/enums.py @@ -1020,6 +1020,21 @@ class ETLFileType(Enum): WORD = "WORD" MD = "MD" + @classmethod + def all(cls): + return [e.value for e in cls] + + @staticmethod + def from_string(value: str): + changed_value = value.upper().replace(" ", "_").replace("-", "_") + if changed_value == "PDF": + return ETLFileType.PDF + elif changed_value == "WORD": + return ETLFileType.WORD + elif changed_value == "MD": + return ETLFileType.MD + raise ValueError("ERROR: Could not parse ETLFileType from string") + class ETLExtractorPDF(Enum): VISION = "VISION" diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index d6c4bfa..bca95ce 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -80,6 +80,7 @@ def create( user_id: str, file_size_bytes: int, extract_config: Dict, + split_config: Dict, transform_config: Dict, load_config: Dict, notify_config: Dict, @@ -100,6 +101,7 @@ def create( file_path=file_path, file_size_bytes=file_size_bytes, extract_config=extract_config, + split_config=split_config, transform_config=transform_config, load_config=load_config, notify_config=notify_config, @@ -118,6 +120,7 @@ def update( file_path: Optional[str] = None, file_size_bytes: Optional[int] = None, extract_config: Optional[Dict] = None, + split_config: Optional[Dict] = None, transform_config: Optional[Dict] = None, load_config: Optional[Dict] = None, notify_config: Optional[Dict] = None, @@ -145,7 +148,10 @@ def update( etl_task.file_size_bytes = file_size_bytes if extract_config is not None: etl_task.extract_config = extract_config - flag_modified(etl_task, "config") + flag_modified(etl_task, "extract_config") + if split_config is not None: + etl_task.split_config = split_config + flag_modified(etl_task, "split_config") if transform_config is not None: etl_task.transform_config = transform_config flag_modified(etl_task, "transform_config") From bd79420636221f13c6854a6c3fd382a82a490e97 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 27 Oct 2025 00:12:08 +0100 Subject: [PATCH 12/36] fix: set split types to chunk and shrink --- enums.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enums.py b/enums.py index a742cd3..f84371e 100644 --- a/enums.py +++ b/enums.py @@ -1011,7 +1011,7 @@ class MessageInitiationType(Enum): class ETLFileSplitType(Enum): - SECTION = "SECTION" + CHUNK = "CHUNK" SHRINK = "SHRINK" From cec3c926d28986bb235b6e3a4b36d063cf086c80 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 27 Oct 2025 00:16:14 +0100 Subject: [PATCH 13/36] perf: update enum name --- enums.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enums.py b/enums.py index f84371e..ac41f40 100644 --- a/enums.py +++ b/enums.py @@ -1010,7 +1010,7 @@ class MessageInitiationType(Enum): MACRO = "MACRO" -class ETLFileSplitType(Enum): +class ETLSplitStrategy(Enum): CHUNK = "CHUNK" SHRINK = "SHRINK" From ef216ee59b33f1ff8dc507e719b2dfad3d1b1be7 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 27 Oct 2025 00:33:06 +0100 Subject: [PATCH 14/36] perf: default ETLExtractorPDF to PDF2MD --- enums.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/enums.py b/enums.py index ac41f40..73fa52f 100644 --- a/enums.py +++ b/enums.py @@ -1052,9 +1052,8 @@ def from_string(value: str): return ETLExtractorPDF.VISION elif changed_value == "AZURE_DI": return ETLExtractorPDF.AZURE_DI - elif changed_value == "PDF2MD": + else: return ETLExtractorPDF.PDF2MD - raise ValueError("ERROR: Could not parse ETLExtractorPDF from string") class ETLExtractorMD(Enum): From 1e6dff4d80e135c44816918341cbb36071a58c73 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 27 Oct 2025 21:53:07 +0100 Subject: [PATCH 15/36] perf: fkey alignment --- cognition_objects/markdown_file.py | 13 +++++++ integration_objects/manager.py | 11 ++++++ models.py | 60 +++++++++++++++++++++--------- 3 files changed, 66 insertions(+), 18 deletions(-) diff --git a/cognition_objects/markdown_file.py b/cognition_objects/markdown_file.py index 8b1e2d2..8c06e2b 100644 --- a/cognition_objects/markdown_file.py +++ b/cognition_objects/markdown_file.py @@ -19,6 +19,17 @@ def get(org_id: str, md_file_id: str) -> CognitionMarkdownFile: ) +def get_by_etl_task_id(org_id: str, etl_task_id: str) -> CognitionMarkdownFile: + return ( + session.query(CognitionMarkdownFile) + .filter( + CognitionMarkdownFile.organization_id == org_id, + CognitionMarkdownFile.etl_task_id == etl_task_id, + ) + .first() + ) + + def get_enriched(org_id: str, md_file_id: str) -> Dict[str, Any]: org_id = prevent_sql_injection(org_id, isinstance(org_id, str)) md_file_id = prevent_sql_injection(md_file_id, isinstance(org_id, str)) @@ -147,6 +158,7 @@ def create( meta_data: Optional[Dict[str, Any]] = None, with_commit: bool = True, created_at: Optional[datetime] = None, + etl_task_id: Optional[str] = None, ) -> CognitionMarkdownFile: markdown_file: CognitionMarkdownFile = CognitionMarkdownFile( organization_id=org_id, @@ -159,6 +171,7 @@ def create( category_origin=category_origin, state=enums.CognitionMarkdownFileState.QUEUE.value, meta_data=meta_data, + etl_task_id=etl_task_id, ) general.add(markdown_file, with_commit) diff --git a/integration_objects/manager.py b/integration_objects/manager.py index ea2a53f..a9cd02f 100644 --- a/integration_objects/manager.py +++ b/integration_objects/manager.py @@ -30,6 +30,17 @@ def get_by_id( return session.query(IntegrationModel).filter(IntegrationModel.id == id).first() +def get_by_etl_task_id( + IntegrationModel: Type, + etl_task_id: str, +) -> object: + return ( + session.query(IntegrationModel) + .filter(IntegrationModel.etl_task_id == etl_task_id) + .first() + ) + + def get_by_running_id( IntegrationModel: Type, integration_id: str, diff --git a/models.py b/models.py index e71dcec..b0a79db 100644 --- a/models.py +++ b/models.py @@ -1562,7 +1562,14 @@ class CognitionMarkdownDataset(Base): class CognitionMarkdownFile(Base): __tablename__ = Tablenames.MARKDOWN_FILE.value - __table_args__ = {"schema": "cognition"} + __table_args__ = ( + UniqueConstraint( + "id", + "etl_task_id", + name=f"unique_{__tablename__}_etl_task_id", + ), + {"schema": "cognition"}, + ) id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) organization_id = Column( UUID(as_uuid=True), @@ -1592,6 +1599,12 @@ class CognitionMarkdownFile(Base): is_reviewed = Column(Boolean, default=False) meta_data = Column(JSON) + etl_task_id = Column( + UUID(as_uuid=True), + ForeignKey(f"global.{Tablenames.ETL_TASK.value}.id", ondelete="CASCADE"), + index=True, + ) + class FileTransformationLLMLogs(Base): __tablename__ = Tablenames.FILE_TRANSFORMATION_LLM_LOGS.value @@ -2230,6 +2243,7 @@ class IntegrationGithubFile(Base): "integration_id", "running_id", "source", + "etl_task_id", name=f"unique_{__tablename__}_source", ), {"schema": "integration"}, @@ -2261,6 +2275,12 @@ class IntegrationGithubFile(Base): sha = Column(String) code_language = Column(String) + etl_task_id = Column( + UUID(as_uuid=True), + ForeignKey(f"global.{Tablenames.ETL_TASK.value}.id", ondelete="CASCADE"), + index=True, + ) + class IntegrationGithubIssue(Base): __tablename__ = Tablenames.INTEGRATION_GITHUB_ISSUE.value @@ -2269,6 +2289,7 @@ class IntegrationGithubIssue(Base): "integration_id", "running_id", "source", + "etl_task_id", name=f"unique_{__tablename__}_source", ), {"schema": "integration"}, @@ -2303,6 +2324,12 @@ class IntegrationGithubIssue(Base): milestone = Column(String) number = Column(Integer) + etl_task_id = Column( + UUID(as_uuid=True), + ForeignKey(f"global.{Tablenames.ETL_TASK.value}.id", ondelete="CASCADE"), + index=True, + ) + class IntegrationPdf(Base): __tablename__ = Tablenames.INTEGRATION_PDF.value @@ -2311,6 +2338,7 @@ class IntegrationPdf(Base): "integration_id", "running_id", "source", + "etl_task_id", name=f"unique_{__tablename__}_source", ), {"schema": "integration"}, @@ -2343,6 +2371,12 @@ class IntegrationPdf(Base): total_pages = Column(Integer) title = Column(String) + etl_task_id = Column( + UUID(as_uuid=True), + ForeignKey(f"global.{Tablenames.ETL_TASK.value}.id", ondelete="CASCADE"), + index=True, + ) + class IntegrationSharepoint(Base): __tablename__ = Tablenames.INTEGRATION_SHAREPOINT.value @@ -2351,6 +2385,7 @@ class IntegrationSharepoint(Base): "integration_id", "running_id", "source", + "etl_task_id", name=f"unique_{__tablename__}_source", ), {"schema": "integration"}, @@ -2394,6 +2429,12 @@ class IntegrationSharepoint(Base): permissions = Column(JSON) file_properties = Column(JSON) + etl_task_id = Column( + UUID(as_uuid=True), + ForeignKey(f"global.{Tablenames.ETL_TASK.value}.id", ondelete="CASCADE"), + index=True, + ) + class IntegrationSharepointPropertySync(Base): __tablename__ = Tablenames.INTEGRATION_SHAREPOINT_PROPERTY_SYNC.value @@ -2522,23 +2563,6 @@ class EtlTask(Base): ForeignKey(f"{Tablenames.USER.value}.id", ondelete="SET NULL"), index=True, ) - markdown_file_id = Column( - UUID(as_uuid=True), - ForeignKey( - f"cognition.{Tablenames.MARKDOWN_FILE.value}.id", ondelete="CASCADE" - ), - index=True, - nullable=True, - ) - sharepoint_file_id = Column( - UUID(as_uuid=True), - ForeignKey( - f"integration.{Tablenames.INTEGRATION_SHAREPOINT.value}.id", - ondelete="CASCADE", - ), - index=True, - nullable=True, - ) file_path = Column(String) file_size_bytes = Column(BigInteger) tokenizer = Column(String) From 3cbab40386dc4b2d51dd841a7be8313afc3699a0 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 27 Oct 2025 21:58:45 +0100 Subject: [PATCH 16/36] perf: add ETLTransformer --- enums.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/enums.py b/enums.py index 73fa52f..7aaf9f3 100644 --- a/enums.py +++ b/enums.py @@ -1069,3 +1069,24 @@ def from_string(value: str): if changed_value == "FILESYSTEM": return ETLExtractorMD.FILESYSTEM raise ValueError("ERROR: Could not parse ETLExtractorMD from string") + + +class ETLTransformer(Enum): + SUMMARIZE = "SUMMARIZE" + CLEANSE = "CLEANSE" + TEXT_TO_TABLE = "TEXT_TO_TABLE" + + @classmethod + def all(cls): + return [e.value for e in cls] + + @staticmethod + def from_string(value: str): + changed_value = value.upper().replace(" ", "_").replace("-", "_") + if changed_value == "SUMMARIZE": + return ETLTransformer.SUMMARIZE + elif changed_value == "CLEANSE": + return ETLTransformer.CLEANSE + elif changed_value == "TEXT_TO_TABLE": + return ETLTransformer.TEXT_TO_TABLE + raise ValueError("ERROR: Could not parse ETLTransformer from string") From ccfd2be3aa4808e7e4f939d8774d058a5cbc2c16 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 27 Oct 2025 23:24:49 +0100 Subject: [PATCH 17/36] perf: remove deleted cols --- cognition_objects/markdown_file.py | 5 +++-- global_objects/etl_task.py | 4 ---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/cognition_objects/markdown_file.py b/cognition_objects/markdown_file.py index 8c06e2b..e872590 100644 --- a/cognition_objects/markdown_file.py +++ b/cognition_objects/markdown_file.py @@ -158,7 +158,6 @@ def create( meta_data: Optional[Dict[str, Any]] = None, with_commit: bool = True, created_at: Optional[datetime] = None, - etl_task_id: Optional[str] = None, ) -> CognitionMarkdownFile: markdown_file: CognitionMarkdownFile = CognitionMarkdownFile( organization_id=org_id, @@ -171,7 +170,6 @@ def create( category_origin=category_origin, state=enums.CognitionMarkdownFileState.QUEUE.value, meta_data=meta_data, - etl_task_id=etl_task_id, ) general.add(markdown_file, with_commit) @@ -188,6 +186,7 @@ def update( finished_at: Optional[datetime] = None, error: Optional[str] = None, meta_data: Optional[Dict[str, Any]] = None, + etl_task_id: Optional[Dict[str, Any]] = None, overwrite_meta_data: bool = True, with_commit: bool = True, ) -> CognitionMarkdownFile: @@ -212,6 +211,8 @@ def update( markdown_file.meta_data = meta_data else: markdown_file.meta_data = {**markdown_file.meta_data, **meta_data} + if etl_task_id is not None and markdown_file.etl_task_id is None: + markdown_file.etl_task_id = etl_task_id general.flush_or_commit(with_commit) return markdown_file diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index bca95ce..787a6f6 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -87,8 +87,6 @@ def create( llm_config: Dict, tokenizer: str, file_path: Optional[str] = None, - markdown_file_id: Optional[str] = None, - sharepoint_file_id: Optional[str] = None, id: Optional[str] = None, with_commit: bool = True, ) -> EtlTask: @@ -96,8 +94,6 @@ def create( id=id, organization_id=org_id, created_by=user_id, - markdown_file_id=markdown_file_id, - sharepoint_file_id=sharepoint_file_id, file_path=file_path, file_size_bytes=file_size_bytes, extract_config=extract_config, From 3dc3020b0badd4a277794bdd5ed8e4279d0fe00f Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Tue, 28 Oct 2025 09:35:44 +0100 Subject: [PATCH 18/36] perf: add monitor.set_etl_task_to_failed --- business_objects/monitor.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/business_objects/monitor.py b/business_objects/monitor.py index 65429c5..c56aa42 100644 --- a/business_objects/monitor.py +++ b/business_objects/monitor.py @@ -5,6 +5,7 @@ from submodules.model.models import TaskQueue, Organization from submodules.model.util import prevent_sql_injection from submodules.model.session import session +from submodules.model.global_objects import etl_task as etl_task_db_bo from submodules.model.cognition_objects import ( macro as macro_db_bo, markdown_file as markdown_file_db_bo, @@ -220,6 +221,26 @@ def set_integration_task_to_failed( ) +def set_etl_task_to_failed( + id: str, + is_active: bool = False, + error_message: Optional[str] = None, + state: Optional[ + enums.CognitionMarkdownFileState + ] = enums.CognitionMarkdownFileState.FAILED, + with_commit: bool = True, +) -> None: + # argument `state` is a workaround for cognition-gateway/api/routes/integrations.delete_many + etl_task_db_bo.update( + id=id, + state=state, + finished_at=datetime.datetime.now(datetime.timezone.utc), + is_active=is_active, + error_message=error_message, + with_commit=with_commit, + ) + + def __select_running_information_source_payloads( project_id: Optional[str] = None, only_running: bool = False, From 1b39c448a0ceb811588410ec79dd2566ef418890 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Tue, 28 Oct 2025 23:17:13 +0100 Subject: [PATCH 19/36] perf: EnumKern --- enums.py | 83 ++++++++++++++++++++------------------------------------ 1 file changed, 30 insertions(+), 53 deletions(-) diff --git a/enums.py b/enums.py index 7aaf9f3..af6535e 100644 --- a/enums.py +++ b/enums.py @@ -2,6 +2,20 @@ from typing import Any +class EnumKern(Enum): + @classmethod + def all(cls): + return [e.value for e in cls] + + @classmethod + def from_string(cls, value: str): + changed_value = value.upper().replace(" ", "_").replace("-", "_") + for member in cls: + if member.value == changed_value: + return member + raise ValueError(f"ERROR: Unknown enum {cls.__name__}: {value}") + + class DataTypes(Enum): INTEGER = "INTEGER" FLOAT = "FLOAT" @@ -1015,78 +1029,41 @@ class ETLSplitStrategy(Enum): SHRINK = "SHRINK" -class ETLFileType(Enum): +class ETLFileType(EnumKern): PDF = "PDF" WORD = "WORD" MD = "MD" - @classmethod - def all(cls): - return [e.value for e in cls] - @staticmethod - def from_string(value: str): - changed_value = value.upper().replace(" ", "_").replace("-", "_") - if changed_value == "PDF": - return ETLFileType.PDF - elif changed_value == "WORD": - return ETLFileType.WORD - elif changed_value == "MD": - return ETLFileType.MD - raise ValueError("ERROR: Could not parse ETLFileType from string") +class ETLExtractorMD(Enum): + FILESYSTEM = "FILESYSTEM" -class ETLExtractorPDF(Enum): +class ETLExtractorPDF(EnumKern): VISION = "VISION" AZURE_DI = "AZURE_DI" PDF2MD = "PDF2MD" - @classmethod - def all(cls): - return [e.value for e in cls] - @staticmethod - def from_string(value: str): - changed_value = value.upper().replace(" ", "_").replace("-", "_") - if changed_value == "VISION": - return ETLExtractorPDF.VISION - elif changed_value == "AZURE_DI": - return ETLExtractorPDF.AZURE_DI - else: - return ETLExtractorPDF.PDF2MD +class ETLExtractorWord(Enum): + FILESYSTEM = "FILESYSTEM" -class ETLExtractorMD(Enum): - FILESYSTEM = "FILESYSTEM" +class ETLExtractor: + MD = ETLExtractorMD + PDF = ETLExtractorPDF + WORD = ETLExtractorWord @classmethod - def all(cls): - return [e.value for e in cls] - - @staticmethod - def from_string(value: str): + def from_string(cls, value: str): changed_value = value.upper().replace(" ", "_").replace("-", "_") - if changed_value == "FILESYSTEM": - return ETLExtractorMD.FILESYSTEM - raise ValueError("ERROR: Could not parse ETLExtractorMD from string") + for member in cls: + if member.name == changed_value: + return member + raise ValueError(f"ERROR: Unknown enum {cls.__name__}: {value}") -class ETLTransformer(Enum): +class ETLTransformer(EnumKern): SUMMARIZE = "SUMMARIZE" CLEANSE = "CLEANSE" TEXT_TO_TABLE = "TEXT_TO_TABLE" - - @classmethod - def all(cls): - return [e.value for e in cls] - - @staticmethod - def from_string(value: str): - changed_value = value.upper().replace(" ", "_").replace("-", "_") - if changed_value == "SUMMARIZE": - return ETLTransformer.SUMMARIZE - elif changed_value == "CLEANSE": - return ETLTransformer.CLEANSE - elif changed_value == "TEXT_TO_TABLE": - return ETLTransformer.TEXT_TO_TABLE - raise ValueError("ERROR: Could not parse ETLTransformer from string") From c304bac49459806efe61317d04453273017b1621 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Tue, 28 Oct 2025 23:17:29 +0100 Subject: [PATCH 20/36] perf: add etl_task_id to integration records --- integration_objects/manager.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/integration_objects/manager.py b/integration_objects/manager.py index a9cd02f..aa0e082 100644 --- a/integration_objects/manager.py +++ b/integration_objects/manager.py @@ -166,6 +166,7 @@ def update( running_id: Optional[int] = None, updated_at: Optional[datetime] = None, error_message: Optional[str] = None, + etl_task_id: Optional[str] = None, with_commit: bool = True, **metadata, ) -> Optional[object]: @@ -183,6 +184,8 @@ def update( integration_record.updated_at = updated_at if error_message is not None: integration_record.error_message = error_message + if etl_task_id is not None and integration_record.etl_task_id is None: + integration_record.etl_task_id = etl_task_id record_updated = False for key, value in metadata.items(): From 7923e1965f239f28b983398c94d3d08d3a5114b8 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Wed, 29 Oct 2025 01:46:34 +0100 Subject: [PATCH 21/36] perf: enum alignment --- enums.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/enums.py b/enums.py index af6535e..fd8f1bb 100644 --- a/enums.py +++ b/enums.py @@ -1024,7 +1024,7 @@ class MessageInitiationType(Enum): MACRO = "MACRO" -class ETLSplitStrategy(Enum): +class ETLSplitStrategy(EnumKern): CHUNK = "CHUNK" SHRINK = "SHRINK" @@ -1035,7 +1035,7 @@ class ETLFileType(EnumKern): MD = "MD" -class ETLExtractorMD(Enum): +class ETLExtractorMD(EnumKern): FILESYSTEM = "FILESYSTEM" @@ -1045,7 +1045,7 @@ class ETLExtractorPDF(EnumKern): PDF2MD = "PDF2MD" -class ETLExtractorWord(Enum): +class ETLExtractorWord(EnumKern): FILESYSTEM = "FILESYSTEM" From 57f928098d283bd401b5c759736798b4c0863af4 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Wed, 29 Oct 2025 01:46:58 +0100 Subject: [PATCH 22/36] perf: add project update for integration --- cognition_objects/integration.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index 7b6fce7..bed06bd 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -200,6 +200,7 @@ def create( def update( id: str, + project_id: Optional[str] = None, updated_by: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = None, @@ -219,6 +220,8 @@ def update( if not integration: return None + if project_id is not None and integration.project_id is None: + integration.project_id = project_id if updated_by is not None: integration.updated_by = updated_by if name is not None: From 0a5854349696ba65fd239dbf1ac10d4bb14f6d66 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Wed, 29 Oct 2025 11:23:47 +0100 Subject: [PATCH 23/36] perf: etl and integration deletions --- cognition_objects/integration.py | 11 +++++++++++ cognition_objects/markdown_dataset.py | 18 +++++++++++++++++- cognition_objects/markdown_file.py | 21 +++++++++++++++++---- integration_objects/manager.py | 21 ++++++++++++++++++++- 4 files changed, 65 insertions(+), 6 deletions(-) diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index bed06bd..d9b06ca 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -4,6 +4,7 @@ from sqlalchemy.orm.attributes import flag_modified from ..business_objects import general +from ..integration_objects import manager as integration_manager_db_bo from ..session import session from ..models import CognitionIntegration, CognitionGroup from ..enums import ( @@ -281,6 +282,16 @@ def execution_finished(id: str) -> bool: def delete_many( ids: List[str], delete_cognition_groups: bool = True, with_commit: bool = True ) -> None: + integration_record_ids = [] + for id in ids: + integration_model = set() + recs = integration_manager_db_bo.get_all_by_integration_id(id) + integration_model.update([type(rec) for rec in recs]) + integration_record_ids.extend([rec.id for rec in recs]) + integration_manager_db_bo.delete_many( + integration_model.pop(), ids=integration_record_ids, with_commit=True + ) + ( session.query(CognitionIntegration) .filter(CognitionIntegration.id.in_(ids)) diff --git a/cognition_objects/markdown_dataset.py b/cognition_objects/markdown_dataset.py index a7c2cbf..1035ee0 100644 --- a/cognition_objects/markdown_dataset.py +++ b/cognition_objects/markdown_dataset.py @@ -3,9 +3,10 @@ from ..business_objects import general from ..session import session -from ..models import CognitionMarkdownDataset, Project +from ..models import CognitionMarkdownDataset, CognitionMarkdownFile, Project from ..enums import Tablenames, MarkdownFileCategoryOrigin from ..util import prevent_sql_injection +from .markdown_file import delete_many as delete_many_md_files def get(org_id: str, id: str) -> CognitionMarkdownDataset: @@ -184,6 +185,21 @@ def delete_many(org_id: str, dataset_ids: List[str], with_commit: bool = True) - ), ).delete(synchronize_session=False) + md_file_ids = ( + session.query(CognitionMarkdownFile.id) + .filter( + CognitionMarkdownFile.organization_id == org_id, + CognitionMarkdownFile.dataset_id.in_(dataset_ids), + ) + .all() + ) + + delete_many_md_files( + org_id=org_id, + md_file_ids=[md_file_id for (md_file_id,) in md_file_ids], + with_commit=True, + ) + session.query(CognitionMarkdownDataset).filter( CognitionMarkdownDataset.organization_id == org_id, CognitionMarkdownDataset.id.in_(dataset_ids), diff --git a/cognition_objects/markdown_file.py b/cognition_objects/markdown_file.py index e872590..80e16d8 100644 --- a/cognition_objects/markdown_file.py +++ b/cognition_objects/markdown_file.py @@ -4,7 +4,7 @@ from .. import enums from ..business_objects import general from ..session import session -from ..models import CognitionMarkdownFile +from ..models import CognitionMarkdownFile, EtlTask from ..util import prevent_sql_injection @@ -82,8 +82,12 @@ def __get_enriched_query( ) else: mf_select = "mf.*" + et_state = "et.state" + mf_state = "mf.state" - query = f"""SELECT {mf_select} FROM cognition.markdown_file mf + query = f"""SELECT {mf_select}, COALESCE({et_state}, {mf_state}) AS etl_state + FROM cognition.markdown_file mf + LEFT JOIN global.etl_task et ON mf.etl_task_id = et.id """ query += f"WHERE mf.organization_id = '{org_id}' {where_add}" query += query_add @@ -219,16 +223,25 @@ def update( def delete(org_id: str, md_file_id: str, with_commit: bool = True) -> None: - session.query(CognitionMarkdownFile).filter( + md_file = session.query(CognitionMarkdownFile).filter( CognitionMarkdownFile.organization_id == org_id, CognitionMarkdownFile.id == md_file_id, + ) + session.query(EtlTask).filter( + EtlTask.organization_id == org_id, EtlTask.id == md_file.etl_task_id ).delete() + md_file.delete() general.flush_or_commit(with_commit) def delete_many(org_id: str, md_file_ids: List[str], with_commit: bool = True) -> None: - session.query(CognitionMarkdownFile).filter( + md_files = session.query(CognitionMarkdownFile).filter( CognitionMarkdownFile.organization_id == org_id, CognitionMarkdownFile.id.in_(md_file_ids), + ) + session.query(EtlTask).filter( + EtlTask.organization_id == org_id, + EtlTask.id.in_([mf.etl_task_id for mf in md_files]), ).delete(synchronize_session=False) + md_files.delete(synchronize_session=False) general.flush_or_commit(with_commit) diff --git a/integration_objects/manager.py b/integration_objects/manager.py index aa0e082..23d5519 100644 --- a/integration_objects/manager.py +++ b/integration_objects/manager.py @@ -3,10 +3,18 @@ from sqlalchemy import func from sqlalchemy.orm.attributes import flag_modified +from ..enums import CognitionIntegrationType from ..business_objects import general from ..cognition_objects import integration as integration_db_bo +from ..global_objects import etl_task as etl_task_db_bo from ..session import session from .helper import get_supported_metadata_keys +from ..models import ( + IntegrationSharepoint, + IntegrationPdf, + IntegrationGithubIssue, + IntegrationGithubFile, +) def get( @@ -72,9 +80,17 @@ def get_by_source( def get_all_by_integration_id( - IntegrationModel: Type, integration_id: str, ) -> List[object]: + integration = integration_db_bo.get_by_id(integration_id) + if integration.type == CognitionIntegrationType.SHAREPOINT.value: + IntegrationModel = IntegrationSharepoint + elif integration.type == CognitionIntegrationType.PDF.value: + IntegrationModel = IntegrationPdf + elif integration.type == CognitionIntegrationType.GITHUB_FILE.value: + IntegrationModel = IntegrationGithubFile + elif integration.type == CognitionIntegrationType.GITHUB_ISSUE.value: + IntegrationModel = IntegrationGithubIssue return ( session.query(IntegrationModel) .filter(IntegrationModel.integration_id == integration_id) @@ -213,6 +229,9 @@ def delete_many( integration_records = session.query(IntegrationModel).filter( IntegrationModel.id.in_(ids) ) + etl_task_db_bo.delete_many( + ids=[record.etl_task_id for record in integration_records] + ) integration_records.delete(synchronize_session=False) general.flush_or_commit(with_commit) From 9e069eb2e93c50919e05e56d336660b2b20887e0 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Wed, 29 Oct 2025 12:00:03 +0100 Subject: [PATCH 24/36] fix: thread start in general --- business_objects/general.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/business_objects/general.py b/business_objects/general.py index 0475f39..6a5978c 100644 --- a/business_objects/general.py +++ b/business_objects/general.py @@ -77,7 +77,7 @@ def force_remove_and_refresh_session_by_id(session_id: str) -> bool: if session_id not in session_lookup: return False # context vars cant be closed from a different context but we can work around it by using a thread (which creates a new context) with the same id - daemon.run_without_db_token(__close_in_context(session_id)) + daemon.run_without_db_token(__close_in_context, session_id) return True From 99323bda6dc969569a882d729f016b743dc12509 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Wed, 29 Oct 2025 19:09:22 +0100 Subject: [PATCH 25/36] perf: add ETLExtractorPDF from_string --- enums.py | 12 +++++++++++- global_objects/etl_task.py | 2 ++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/enums.py b/enums.py index fd8f1bb..1cf1dd3 100644 --- a/enums.py +++ b/enums.py @@ -1039,11 +1039,21 @@ class ETLExtractorMD(EnumKern): FILESYSTEM = "FILESYSTEM" -class ETLExtractorPDF(EnumKern): +class ETLExtractorPDF(Enum): VISION = "VISION" AZURE_DI = "AZURE_DI" PDF2MD = "PDF2MD" + @classmethod + def from_string(cls, value: str): + changed_value = value.upper().replace(" ", "_").replace("-", "_") + for member in cls: + if member.value == changed_value: + return member + if changed_value == "PDF2MARKDOWN": + return cls.PDF2MD + raise ValueError(f"ERROR: Unknown enum {cls.__name__}: {value}") + class ETLExtractorWord(EnumKern): FILESYSTEM = "FILESYSTEM" diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 787a6f6..e2da681 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -86,6 +86,7 @@ def create( notify_config: Dict, llm_config: Dict, tokenizer: str, + priority: Optional[int] = -1, file_path: Optional[str] = None, id: Optional[str] = None, with_commit: bool = True, @@ -103,6 +104,7 @@ def create( notify_config=notify_config, llm_config=llm_config, tokenizer=tokenizer, + priority=priority, ) general.add(etl_task, with_commit) From 3ed424f817afd46da3e0522fc3116b156aac1195 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 00:32:58 +0100 Subject: [PATCH 26/36] perf: integration_objects cleanup --- integration_objects/manager.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/integration_objects/manager.py b/integration_objects/manager.py index 23d5519..3514dd6 100644 --- a/integration_objects/manager.py +++ b/integration_objects/manager.py @@ -99,6 +99,20 @@ def get_all_by_integration_id( ) +def integration_model(integration_id: str) -> Type: + integration = integration_db_bo.get_by_id(integration_id) + if integration.type == CognitionIntegrationType.SHAREPOINT.value: + return IntegrationSharepoint + elif integration.type == CognitionIntegrationType.PDF.value: + return IntegrationPdf + elif integration.type == CognitionIntegrationType.GITHUB_FILE.value: + return IntegrationGithubFile + elif integration.type == CognitionIntegrationType.GITHUB_ISSUE.value: + return IntegrationGithubIssue + else: + raise ValueError(f"Unsupported integration type: {integration.type}") + + def get_all_by_project_id( IntegrationModel: Type, project_id: str, @@ -115,7 +129,6 @@ def get_all_by_project_id( def get_existing_integration_records( - IntegrationModel: Type, integration_id: str, by: str = "source", ) -> Dict[str, object]: @@ -123,15 +136,15 @@ def get_existing_integration_records( # once an object_id can reference multiple different integration records return { getattr(record, by, record.source): record - for record in get_all_by_integration_id(IntegrationModel, integration_id) + for record in get_all_by_integration_id(integration_id) } def get_running_ids( - IntegrationModel: Type, integration_id: str, by: str = "source", ) -> Dict[str, int]: + IntegrationModel = integration_model(integration_id) return dict( session.query( getattr(IntegrationModel, by, IntegrationModel.source), From 18ab8e362bdafc8f6ea1863c736b8b8e2b9d985a Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 00:52:54 +0100 Subject: [PATCH 27/36] perf: etl_task.cache_config --- global_objects/etl_task.py | 6 ++++++ models.py | 3 +++ 2 files changed, 9 insertions(+) diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index e2da681..dbdd141 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -79,6 +79,7 @@ def create( org_id: str, user_id: str, file_size_bytes: int, + cache_config: Dict, extract_config: Dict, split_config: Dict, transform_config: Dict, @@ -97,6 +98,7 @@ def create( created_by=user_id, file_path=file_path, file_size_bytes=file_size_bytes, + cache_config=cache_config, extract_config=extract_config, split_config=split_config, transform_config=transform_config, @@ -117,6 +119,7 @@ def update( updated_by: Optional[str] = None, file_path: Optional[str] = None, file_size_bytes: Optional[int] = None, + cache_config: Optional[Dict] = None, extract_config: Optional[Dict] = None, split_config: Optional[Dict] = None, transform_config: Optional[Dict] = None, @@ -144,6 +147,9 @@ def update( etl_task.file_path = file_path if file_size_bytes is not None and etl_task.file_size_bytes is None: etl_task.file_size_bytes = file_size_bytes + if cache_config is not None: + etl_task.cache_config = cache_config + flag_modified(etl_task, "cache_config") if extract_config is not None: etl_task.extract_config = extract_config flag_modified(etl_task, "extract_config") diff --git a/models.py b/models.py index b0a79db..520d51c 100644 --- a/models.py +++ b/models.py @@ -2567,6 +2567,9 @@ class EtlTask(Base): file_size_bytes = Column(BigInteger) tokenizer = Column(String) + cache_config = Column( + JSON + ) # {"use_file_cache": true, "use_extraction_cache": false, "use_transformation_cache": true} extract_config = Column(JSON) # schema depends on the file type split_config = Column(JSON) # {"chunk": true, "shrink": false} transform_config = Column( From 298b517ced01832f710143c01c9b86ec96a6222a Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 01:26:24 +0100 Subject: [PATCH 28/36] perf: error print --- enums.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/enums.py b/enums.py index 1cf1dd3..a581ae5 100644 --- a/enums.py +++ b/enums.py @@ -13,7 +13,8 @@ def from_string(cls, value: str): for member in cls: if member.value == changed_value: return member - raise ValueError(f"ERROR: Unknown enum {cls.__name__}: {value}") + print(f"ERROR: unknown enum {cls.__name__}: {value}", flush=True) + raise ValueError(f"Unknown enum {cls.__name__}: {value}") class DataTypes(Enum): From 9a6149b7ff8eee34636ebc2b3aee1fb1cab62311 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 01:29:22 +0100 Subject: [PATCH 29/36] fix: overwrite etl_task_id fkey --- cognition_objects/integration.py | 12 ++++++------ cognition_objects/markdown_file.py | 2 +- integration_objects/manager.py | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index d9b06ca..d05eab0 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -282,14 +282,14 @@ def execution_finished(id: str) -> bool: def delete_many( ids: List[str], delete_cognition_groups: bool = True, with_commit: bool = True ) -> None: - integration_record_ids = [] for id in ids: - integration_model = set() - recs = integration_manager_db_bo.get_all_by_integration_id(id) - integration_model.update([type(rec) for rec in recs]) - integration_record_ids.extend([rec.id for rec in recs]) + IntegrationModel, integration_records = ( + integration_manager_db_bo.get_all_by_integration_id(id) + ) integration_manager_db_bo.delete_many( - integration_model.pop(), ids=integration_record_ids, with_commit=True + IntegrationModel, + ids=[rec.id for rec in integration_records], + with_commit=True, ) ( diff --git a/cognition_objects/markdown_file.py b/cognition_objects/markdown_file.py index 80e16d8..91d8e81 100644 --- a/cognition_objects/markdown_file.py +++ b/cognition_objects/markdown_file.py @@ -215,7 +215,7 @@ def update( markdown_file.meta_data = meta_data else: markdown_file.meta_data = {**markdown_file.meta_data, **meta_data} - if etl_task_id is not None and markdown_file.etl_task_id is None: + if etl_task_id is not None: markdown_file.etl_task_id = etl_task_id general.flush_or_commit(with_commit) diff --git a/integration_objects/manager.py b/integration_objects/manager.py index 3514dd6..4782f65 100644 --- a/integration_objects/manager.py +++ b/integration_objects/manager.py @@ -91,7 +91,7 @@ def get_all_by_integration_id( IntegrationModel = IntegrationGithubFile elif integration.type == CognitionIntegrationType.GITHUB_ISSUE.value: IntegrationModel = IntegrationGithubIssue - return ( + return IntegrationModel, ( session.query(IntegrationModel) .filter(IntegrationModel.integration_id == integration_id) .order_by(IntegrationModel.created_at) From c77a0319a69e81ec99cc0bc0bc2bd426422fae1c Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 10:07:41 +0100 Subject: [PATCH 30/36] perf: make get_or_create_etl_task a submodule function --- enums.py | 2 +- global_objects/etl_task.py | 91 ++++++++++++++++++++++++++++++++++---- 2 files changed, 83 insertions(+), 10 deletions(-) diff --git a/enums.py b/enums.py index a581ae5..59ee101 100644 --- a/enums.py +++ b/enums.py @@ -1053,7 +1053,7 @@ def from_string(cls, value: str): return member if changed_value == "PDF2MARKDOWN": return cls.PDF2MD - raise ValueError(f"ERROR: Unknown enum {cls.__name__}: {value}") + return cls.VISION class ETLExtractorWord(EnumKern): diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index dbdd141..86ebb09 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -3,15 +3,33 @@ import datetime -from ..business_objects import general -from ..session import session -from ..models import EtlTask -from ..enums import CognitionMarkdownFileState +from submodules.model import enums +from submodules.model.session import session +from submodules.model.business_objects import general +from submodules.model.models import ( + EtlTask, + FileReference, + CognitionMarkdownFile, + CognitionMarkdownDataset, +) FINISHED_STATES = [ - CognitionMarkdownFileState.FINISHED.value, - CognitionMarkdownFileState.FAILED.value, + enums.CognitionMarkdownFileState.FINISHED.value, + enums.CognitionMarkdownFileState.FAILED.value, ] +DEFAULT_FILE_TYPE = enums.ETLFileType.PDF +DEFAULT_EXTRACTORS = { + enums.ETLFileType.MD: enums.ETLExtractorMD.FILESYSTEM, + enums.ETLFileType.PDF: enums.ETLExtractorPDF.PDF2MD, +} + +DEFAULT_FALLBACK_EXTRACTORS = { + enums.ETLFileType.MD: [], + enums.ETLFileType.PDF: [ + enums.ETLExtractorPDF.PDF2MD, + enums.ETLExtractorPDF.VISION, + ], +} def get_by_ids(ids: List[str]) -> List[EtlTask]: @@ -39,7 +57,9 @@ def get_all( query = query.filter(EtlTask.sharepoint_file_id == sharepoint_file_id) if exclude_failed: - query = query.filter(EtlTask.state != CognitionMarkdownFileState.FAILED.value) + query = query.filter( + EtlTask.state != enums.CognitionMarkdownFileState.FAILED.value + ) if only_active: query = query.filter(EtlTask.is_active == True) return query.order_by(EtlTask.created_at.desc()).all() @@ -54,7 +74,9 @@ def get_all_in_org( if only_active: query = query.filter(EtlTask.is_active == True) if exclude_failed: - query = query.filter(EtlTask.state != CognitionMarkdownFileState.FAILED.value) + query = query.filter( + EtlTask.state != enums.CognitionMarkdownFileState.FAILED.value + ) return query.order_by(EtlTask.created_at.desc()).all() @@ -75,6 +97,56 @@ def get_all_in_org_paginated( ) +def get_or_create_markdown_file_etl_task( + org_id: str, + file_reference: FileReference, + markdown_file: CognitionMarkdownFile, + markdown_dataset: CognitionMarkdownDataset, + extractor: str, + cache_config: Dict, + split_config: Dict, + transform_config: Dict, + load_config: Dict, + notify_config: Dict, + priority: Optional[int] = -1, + fallback_extractors: Optional[list[enums.ETLExtractorPDF]] = [], +) -> EtlTask: + if etl_task := ( + session.query(EtlTask).filter(EtlTask.id == markdown_file.etl_task_id).first() + ): + return etl_task + + file_type = enums.ETLFileType.from_string(file_reference.category_origin) + extractor = enums.ETLExtractorPDF.from_string(extractor) + fallback_extractors = list( + filter( + lambda x: x != extractor, + (fallback_extractors or DEFAULT_FALLBACK_EXTRACTORS.get(file_type, [])), + ) + ) + + return create( + org_id=org_id, + user_id=markdown_file.created_by, + file_size_bytes=file_reference.file_size_bytes, + cache_config=cache_config, + extract_config={ + "file_type": file_type.value, + "extractor": extractor.value, + "fallback_extractors": [fe.value for fe in fallback_extractors], + "minio_path": file_reference.minio_path, + "original_file_name": file_reference.original_file_name, + }, + split_config=split_config, + transform_config=transform_config, + load_config=load_config, + notify_config=notify_config, + llm_config=markdown_dataset.llm_config, + tokenizer=markdown_dataset.tokenizer, + priority=priority, + ) + + def create( org_id: str, user_id: str, @@ -128,7 +200,7 @@ def update( llm_config: Optional[Dict] = None, started_at: Optional[datetime.datetime] = None, finished_at: Optional[Union[str, datetime.datetime]] = None, - state: Optional[CognitionMarkdownFileState] = None, + state: Optional[enums.CognitionMarkdownFileState] = None, is_active: Optional[bool] = None, priority: Optional[int] = None, error_message: Optional[str] = None, @@ -205,6 +277,7 @@ def execution_finished(id: str) -> bool: def delete_many(ids: List[str], with_commit: bool = True) -> None: + # TODO: cascade delete cached files ( session.query(EtlTask) .filter(EtlTask.id.in_(ids)) From f5681d719c62ad33259696274049239910707ed0 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 10:13:07 +0100 Subject: [PATCH 31/36] fix: get_or_create_etl_task args --- global_objects/etl_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 86ebb09..277127f 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -116,7 +116,7 @@ def get_or_create_markdown_file_etl_task( ): return etl_task - file_type = enums.ETLFileType.from_string(file_reference.category_origin) + file_type = enums.ETLFileType.from_string(markdown_file.category_origin) extractor = enums.ETLExtractorPDF.from_string(extractor) fallback_extractors = list( filter( From 0396e8231f58326b2a9fdd9bafa638956c182be7 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 11:23:28 +0100 Subject: [PATCH 32/36] perf: align to integrations --- enums.py | 14 +++++++- global_objects/etl_task.py | 64 ++++++++++++++++++++++++++++++++++ integration_objects/manager.py | 6 ++-- 3 files changed, 79 insertions(+), 5 deletions(-) diff --git a/enums.py b/enums.py index 59ee101..051c33e 100644 --- a/enums.py +++ b/enums.py @@ -1030,11 +1030,23 @@ class ETLSplitStrategy(EnumKern): SHRINK = "SHRINK" -class ETLFileType(EnumKern): +class ETLFileType(Enum): PDF = "PDF" WORD = "WORD" MD = "MD" + @classmethod + def from_string(cls, value: str): + changed_value = value.upper().replace(" ", "_").replace("-", "_") + for member in cls: + if member.value == changed_value: + return member + print( + f"WARNING: unknown enum {cls.__name__}: {value}, defaulting to {cls.__name__}.MD", + flush=True, + ) + return cls.MD + class ETLExtractorMD(EnumKern): FILESYSTEM = "FILESYSTEM" diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 277127f..33f1722 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -11,6 +11,8 @@ FileReference, CognitionMarkdownFile, CognitionMarkdownDataset, + CognitionIntegration, + IntegrationSharepoint, ) FINISHED_STATES = [ @@ -147,6 +149,68 @@ def get_or_create_markdown_file_etl_task( ) +def get_or_create_integration_etl_task( + org_id: str, + integration: CognitionIntegration, + record: IntegrationSharepoint, + file_path: str, + extractor: Optional[str], + cache_config: Dict, + split_config: Dict, + transform_config: Dict, + load_config: Dict, + notify_config: Dict, + priority: Optional[int] = -1, + fallback_extractors: Optional[list[enums.ETLExtractorPDF]] = [], +) -> EtlTask: + if etl_task := ( + session.query(EtlTask).filter(EtlTask.id == record.etl_task_id).first() + ): + return etl_task + + if record.extension.replace(".", "") == "FOLDER": + _file_type = "md" + file_size_bytes = 0 + else: + _file_type = record.extension.replace(".", "") + file_size_bytes = record.size + + file_type = enums.ETLFileType.from_string(_file_type) + extractor = extractor or DEFAULT_EXTRACTORS.get( + file_type, enums.ETLExtractorMD.FILESYSTEM + ) + + if fallback_extractors is None: + fallback_extractors = [] + else: + fallback_extractors = list( + filter( + lambda x: x != extractor, + (fallback_extractors or DEFAULT_FALLBACK_EXTRACTORS.get(file_type, [])), + ) + ) + + return create( + org_id=org_id, + user_id=integration.created_by, + file_path=file_path, + file_size_bytes=file_size_bytes, + cache_config=cache_config, + extract_config={ + "file_type": file_type.value, + "extractor": extractor.value, + "fallback_extractors": [fe.value for fe in fallback_extractors], + }, + split_config=split_config, + transform_config=transform_config, + load_config=load_config, + notify_config=notify_config, + llm_config=integration.llm_config, + tokenizer=integration.tokenizer, + priority=priority, + ) + + def create( org_id: str, user_id: str, diff --git a/integration_objects/manager.py b/integration_objects/manager.py index 4782f65..0676f28 100644 --- a/integration_objects/manager.py +++ b/integration_objects/manager.py @@ -134,10 +134,8 @@ def get_existing_integration_records( ) -> Dict[str, object]: # TODO(extension): make return type Dict[str, List[object]] # once an object_id can reference multiple different integration records - return { - getattr(record, by, record.source): record - for record in get_all_by_integration_id(integration_id) - } + _, records = get_all_by_integration_id(integration_id) + return {getattr(record, by, record.source): record for record in records} def get_running_ids( From 9c33c7e417eef028fc77c93a96bd371cfdc2c3fb Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 13:03:28 +0100 Subject: [PATCH 33/36] perf: add ETLCacheKeys --- enums.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/enums.py b/enums.py index 051c33e..e9b2f2c 100644 --- a/enums.py +++ b/enums.py @@ -1090,3 +1090,9 @@ class ETLTransformer(EnumKern): SUMMARIZE = "SUMMARIZE" CLEANSE = "CLEANSE" TEXT_TO_TABLE = "TEXT_TO_TABLE" + + +class ETLCacheKeys(EnumKern): + EXTRACTION = "use_extraction_cache" + SPLITTING = "use_splitting_cache" + TRANSFORMATION = "use_transformation_cache" From e103f2f65f4764e2bc19c1e6d6b58d7390547f18 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 13:17:33 +0100 Subject: [PATCH 34/36] perf: add file-cache to ETLCacheKey enum --- enums.py | 1 + 1 file changed, 1 insertion(+) diff --git a/enums.py b/enums.py index abe7b73..4dae750 100644 --- a/enums.py +++ b/enums.py @@ -1098,6 +1098,7 @@ class ETLTransformer(EnumKern): class ETLCacheKeys(EnumKern): + FILE_CACHE = "use_file_cache" EXTRACTION = "use_extraction_cache" SPLITTING = "use_splitting_cache" TRANSFORMATION = "use_transformation_cache" From 293c14937f8f8d48ac582541cb083988170f98f9 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 31 Oct 2025 00:57:40 +0100 Subject: [PATCH 35/36] perf: minor fixes --- cognition_objects/integration.py | 5 +++- global_objects/etl_task.py | 2 +- integration_objects/manager.py | 48 +++++++++++++++++++++----------- 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index d05eab0..ca6390b 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -3,6 +3,8 @@ from sqlalchemy import func from sqlalchemy.orm.attributes import flag_modified +from src.util.o365 import ETL_DIR + from ..business_objects import general from ..integration_objects import manager as integration_manager_db_bo from ..session import session @@ -283,7 +285,7 @@ def delete_many( ids: List[str], delete_cognition_groups: bool = True, with_commit: bool = True ) -> None: for id in ids: - IntegrationModel, integration_records = ( + integration_records, IntegrationModel = ( integration_manager_db_bo.get_all_by_integration_id(id) ) integration_manager_db_bo.delete_many( @@ -303,6 +305,7 @@ def delete_many( .filter(CognitionGroup.meta_data.op("->>")("integration_id").in_(ids)) .delete(synchronize_session=False) ) + general.flush_or_commit(with_commit) diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 33f1722..a5b6dd5 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -159,7 +159,7 @@ def get_or_create_integration_etl_task( split_config: Dict, transform_config: Dict, load_config: Dict, - notify_config: Dict, + notify_config: Optional[Dict] = None, priority: Optional[int] = -1, fallback_extractors: Optional[list[enums.ETLExtractorPDF]] = [], ) -> EtlTask: diff --git a/integration_objects/manager.py b/integration_objects/manager.py index 0676f28..2bb302f 100644 --- a/integration_objects/manager.py +++ b/integration_objects/manager.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Dict, Union, Type, Any +from typing import List, Optional, Dict, Tuple, Union, Type, Any from datetime import datetime from sqlalchemy import func from sqlalchemy.orm.attributes import flag_modified @@ -14,6 +14,7 @@ IntegrationPdf, IntegrationGithubIssue, IntegrationGithubFile, + EtlTask, ) @@ -81,21 +82,16 @@ def get_by_source( def get_all_by_integration_id( integration_id: str, -) -> List[object]: - integration = integration_db_bo.get_by_id(integration_id) - if integration.type == CognitionIntegrationType.SHAREPOINT.value: - IntegrationModel = IntegrationSharepoint - elif integration.type == CognitionIntegrationType.PDF.value: - IntegrationModel = IntegrationPdf - elif integration.type == CognitionIntegrationType.GITHUB_FILE.value: - IntegrationModel = IntegrationGithubFile - elif integration.type == CognitionIntegrationType.GITHUB_ISSUE.value: - IntegrationModel = IntegrationGithubIssue - return IntegrationModel, ( - session.query(IntegrationModel) - .filter(IntegrationModel.integration_id == integration_id) - .order_by(IntegrationModel.created_at) - .all() +) -> Tuple[List[object], Type]: + IntegrationModel = integration_model(integration_id) + return ( + ( + session.query(IntegrationModel) + .filter(IntegrationModel.integration_id == integration_id) + .order_by(IntegrationModel.created_at) + .all() + ), + IntegrationModel, ) @@ -134,10 +130,28 @@ def get_existing_integration_records( ) -> Dict[str, object]: # TODO(extension): make return type Dict[str, List[object]] # once an object_id can reference multiple different integration records - _, records = get_all_by_integration_id(integration_id) + records, _ = get_all_by_integration_id(integration_id) return {getattr(record, by, record.source): record for record in records} +def get_active_integration_records( + integration_id: str, +) -> Dict[str, object]: + IntegrationModel = integration_model(integration_id) + return ( + session.query(IntegrationModel) + .join( + EtlTask, + IntegrationModel.etl_task_id == EtlTask.id, + ) + .filter( + IntegrationModel.integration_id == integration_id, + EtlTask.is_active == True, + ) + .all() + ) + + def get_running_ids( integration_id: str, by: str = "source", From 2d919530f6c3f09c6b9f6b79481c99d520d857dd Mon Sep 17 00:00:00 2001 From: JWittmeyer Date: Mon, 3 Nov 2025 16:29:24 +0100 Subject: [PATCH 36/36] New table --- cognition_objects/integration.py | 1 - enums.py | 1 + models.py | 27 +++++++++++++++++++++++++++ 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index ca6390b..c9f0ca9 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -3,7 +3,6 @@ from sqlalchemy import func from sqlalchemy.orm.attributes import flag_modified -from src.util.o365 import ETL_DIR from ..business_objects import general from ..integration_objects import manager as integration_manager_db_bo diff --git a/enums.py b/enums.py index 4dae750..f7196d6 100644 --- a/enums.py +++ b/enums.py @@ -194,6 +194,7 @@ class Tablenames(Enum): RELEASE_NOTIFICATION = "release_notification" TIMED_EXECUTIONS = "timed_executions" ETL_TASK = "etl_task" + ETL_CONFIG_PRESET = "etl_config_preset" def snake_case_to_pascal_case(self): # the type name (written in PascalCase) of a table is needed to create backrefs diff --git a/models.py b/models.py index 52573de..2553e72 100644 --- a/models.py +++ b/models.py @@ -2019,6 +2019,33 @@ class CognitionGroupMember(Base): created_at = Column(DateTime, default=sql.func.now()) +class ETLConfigPresets(Base): + __tablename__ = Tablenames.ETL_CONFIG_PRESET.value + __table_args__ = {"schema": "cognition"} + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + organization_id = Column( + UUID(as_uuid=True), + ForeignKey(f"{Tablenames.ORGANIZATION.value}.id", ondelete="CASCADE"), + index=True, + ) + project_id = Column( + UUID(as_uuid=True), + ForeignKey(f"cognition.{Tablenames.PROJECT.value}.id", ondelete="CASCADE"), + index=True, + nullable=True, # future proofing for organization-wide presets/etl page presets + ) + name = Column(String, unique=True) + description = Column(String) + created_at = Column(DateTime, default=sql.func.now()) + created_by = Column( + UUID(as_uuid=True), + ForeignKey(f"{Tablenames.USER.value}.id", ondelete="SET NULL"), + index=True, + ) + etl_config = Column(JSON) # full ETL config JSON schema for how to run the ETL + add_config = Column(JSON) # additional config for e.g. setting scope dict values + + # =========================== Global tables =========================== class GlobalWebsocketAccess(Base): # table to store prepared websocket configuration.