Skip to content

Commit 19c0a4d

Browse files
andhreljaKernlumburovskalinaLennartSchmidtKernJWittmeyer
authored
Cognition integration provider (#167)
* build: third party integration first commit * chore: update enums * perf: add integration acess * perf: rename to integration * perf: add last_extraction column to integration * perf: update integration delta * perf: update integration access to list types * perf: add integration_types to integration access * perf: add integration_types to integration access * perf: rename last_extraction to extract_history * fix: store enum.value instead of enum * fix: integration.project_id nullable * fix: nulable column instead of foreignkey * fix: enum values * perf: task cancellation * fix: keyword arguments * perf: integration record * perf: add tokenizer * perf: add update integration access * perf: update integration endpoints * perf: add get endpoint * Oidc field in the users table * perf: add org_id to integration provider * perf: add org_id support to integration * perf: add record delta criteria * fix: task execution finish on failed integration * perf: add integration finished_at * perf: add started_at * fix: started_at - finished_at syntax error * perf: add integration records * perf: add integration tables * perf: update integrations delta * perf: add sharepoint integration * perf: update integration objects * perf: expand IntegrationSharepoint * fix: integration.started_at perf: add unique constraints to integrations * perf: integration data types * perf: unique constraint names * Reset finished at for new integrations * perf: update integration objects * perf: add integration delta deletion * basic models * perf: last_synced_at integration column * perf: add is_synced column * chore: add typing * perf: add sync columns * fix model * perf: add get_all integrations * chore: add todo comment * permission test * projects with access management * perf: add sharepoint db bo * deactivate mock up * get by user id * enable list payload * perf: integration update * perf: tech discussion feedback * perf: get integrations updates * perf: integration updates * perf: introduce managers * chore: typing * perf: access + check for updates * perf: update integration * perf: add delta url to sharepoint integration * fix: move delta_url to cognitionintegration * perf: integration updates * perf: add updated_by + delta_criteria * perf: add delta_criteria field * perf: check for status improvement * add meta_data group * format * model * sync action * rename action * perf: dynamic 'by' record grouping * style: arguments newlines in function definitions * new getters * delete groups * fix: rm get project by name and org id * perf: pr review comments * perf: pr review comments * fix: paginated query * perf: update integrations model * perf: update to_snake_case regex compilation * access management * perf: db model update * perf: pr comments * perf: remove unnecessary checks * fix: nullable error message * perf: pr comments * delete * perf: move IntegrationMetadata enum to integration_objects.helper * Oidc field in the users table (#170) Co-authored-by: andhreljaKern <andrea.hrelja@kern.ai> * fix: metadata helper function * perf: update integration task type name * style: formatting * perf: pr review comments * perf: add REFINERY_ATTRIBUTE_ACCESS constants * fix: query builder for record_ids * perf: monitor.set_integration_task_to_failed with state * perf: add early exit for deleted integrations * perf: add early exit for task execution * sharepoint active queue * unique by name and integration * typing * improve sql * merge * model * fix group * remove unique * perf: add file_properties integration column * perf: update default state for set_integration_task_to_failed * Adds option filter for pid * PR comments --------- Co-authored-by: Lina <lina.lumburovska@kern.ai> Co-authored-by: LennartSchmidtKern <lennart.schmidt@kern.ai> Co-authored-by: lumburovskalina <104008550+lumburovskalina@users.noreply.github.com> Co-authored-by: JWittmeyer <jens.wittmeyer@kern.ai>
1 parent 4cdfbd2 commit 19c0a4d

File tree

14 files changed

+1327
-13
lines changed

14 files changed

+1327
-13
lines changed

business_objects/embedding.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,11 @@ def __build_payload_selector(
321321
if (
322322
data_type != enums.DataTypes.TEXT.value
323323
and data_type != enums.DataTypes.LLM_RESPONSE.value
324+
and data_type != enums.DataTypes.PERMISSION.value
324325
):
325326
payload_selector += f"'{attr}', (r.\"data\"->>'{attr}')::{data_type}"
327+
if data_type == enums.DataTypes.PERMISSION.value:
328+
payload_selector += f"'{attr}', r.\"data\"->'{attr}'"
326329
else:
327330
payload_selector += f"'{attr}', r.\"data\"->>'{attr}'"
328331
payload_selector = f"json_build_object({payload_selector}) payload"
@@ -391,7 +394,8 @@ def get_tensors_and_attributes_for_qdrant(
391394
WHERE et.project_id = '{project_id}' AND et.embedding_id = '{embedding_id}'
392395
"""
393396
if record_ids:
394-
query += f" AND r.id IN ('{','.join(record_ids)}')"
397+
_record_ids = "','".join(record_ids)
398+
query += f" AND r.id IN ('{_record_ids}')"
395399

396400
return general.execute_all(query)
397401

business_objects/monitor.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from typing import Any, List, Optional
2+
import datetime
23
from . import general
34
from .. import enums
45
from ..models import TaskQueue, Organization
@@ -9,6 +10,7 @@
910
markdown_file as markdown_file_db_bo,
1011
file_extraction as file_extraction_db_bo,
1112
file_transformation as file_transformation_db_bo,
13+
integration as integration_db_bo,
1214
)
1315

1416
FILE_CACHING_IN_PROGRESS_STATES = [
@@ -197,6 +199,27 @@ def set_parse_cognition_file_task_to_failed(
197199
general.commit()
198200

199201

202+
def set_integration_task_to_failed(
203+
integration_id: str,
204+
is_synced: bool = False,
205+
error_message: Optional[str] = None,
206+
state: Optional[
207+
enums.CognitionMarkdownFileState
208+
] = enums.CognitionMarkdownFileState.FAILED,
209+
with_commit: bool = True,
210+
) -> None:
211+
# argument `state` is a workaround for cognition-gateway/api/routes/integrations.delete_many
212+
integration_db_bo.update(
213+
id=integration_id,
214+
state=state,
215+
finished_at=datetime.datetime.now(datetime.timezone.utc),
216+
is_synced=is_synced,
217+
error_message=error_message,
218+
last_synced_at=datetime.datetime.now(datetime.timezone.utc),
219+
with_commit=with_commit,
220+
)
221+
222+
200223
def __select_running_information_source_payloads(
201224
project_id: Optional[str] = None,
202225
only_running: bool = False,

business_objects/project.py

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
from typing import List, Optional, Any, Dict, Union, Set
2-
from sqlalchemy.sql import func
3-
from sqlalchemy import cast, Integer
2+
from sqlalchemy.sql import func, cast
43
from sqlalchemy.sql.functions import coalesce
5-
6-
4+
from sqlalchemy import Integer
75
from . import general, attribute
8-
96
from .. import enums
107
from ..session import session
11-
from ..models import (
12-
Project,
13-
Record,
8+
from ..models import Project, Record, Attribute
9+
from ..integration_objects.helper import (
10+
REFINERY_ATTRIBUTE_ACCESS_GROUPS,
11+
REFINERY_ATTRIBUTE_ACCESS_USERS,
1412
)
1513
from ..util import prevent_sql_injection
1614

@@ -118,12 +116,18 @@ def __build_sql_data_slices_by_project(project_id: str) -> str:
118116
project.id = '{project_id}'::UUID; """
119117

120118

121-
def get_dropdown_list_project_list(org_id: str) -> List[Dict[str, str]]:
119+
def get_dropdown_list_project_list(
120+
org_id: str, project_id: Optional[str] = None
121+
) -> List[Dict[str, str]]:
122122
org_id = prevent_sql_injection(org_id, isinstance(org_id, str))
123+
prj_filter = ""
124+
if project_id:
125+
project_id = prevent_sql_injection(project_id, isinstance(project_id, str))
126+
prj_filter = f"AND p.id = '{project_id}'"
123127
query = f"""
124128
SELECT array_agg(jsonb_build_object('value', p.id,'name',p.NAME))
125129
FROM public.project p
126-
WHERE p.organization_id = '{org_id}' AND p.status != '{enums.ProjectStatus.HIDDEN.value}'
130+
WHERE p.organization_id = '{org_id}' AND p.status != '{enums.ProjectStatus.HIDDEN.value}' {prj_filter}
127131
"""
128132
values = general.execute_first(query)
129133

@@ -155,6 +159,56 @@ def get_all(organization_id: str) -> List[Project]:
155159
)
156160

157161

162+
def get_all_with_access_management(org_id: str) -> List[Dict[str, Any]]:
163+
org_id_safe = prevent_sql_injection(org_id, isinstance(org_id, str))
164+
165+
hidden_status = enums.ProjectStatus.HIDDEN.value
166+
permission_data_type = enums.DataTypes.PERMISSION.value
167+
automatically_created_state = enums.AttributeState.AUTOMATICALLY_CREATED.value
168+
access_groups_attr = REFINERY_ATTRIBUTE_ACCESS_GROUPS
169+
access_users_attr = REFINERY_ATTRIBUTE_ACCESS_USERS
170+
171+
query = f"""
172+
SELECT DISTINCT
173+
p.*,
174+
COALESCE((ci.config -> 'extract_kwargs' ->> 'sync_sharepoint_permissions')::BOOLEAN,FALSE) AS is_sharepoint_sync_active
175+
FROM
176+
public.project p
177+
JOIN
178+
public.attribute a ON p.id = a.project_id
179+
LEFT JOIN
180+
cognition.integration ci ON p.id = ci.project_id
181+
WHERE
182+
p.organization_id = '{org_id_safe}'
183+
AND p.status != '{hidden_status}'
184+
AND a.name IN ('{access_groups_attr}', '{access_users_attr}')
185+
AND a.user_created = FALSE
186+
AND a.data_type = '{permission_data_type}'
187+
AND a.state = '{automatically_created_state}';
188+
"""
189+
190+
values = general.execute_all(query)
191+
return values
192+
193+
194+
def check_access_management_active(project_id: str) -> bool:
195+
return (
196+
session.query(Project)
197+
.join(Attribute, Project.id == Attribute.project_id)
198+
.filter(
199+
Project.id == project_id,
200+
Attribute.name.in_(
201+
[REFINERY_ATTRIBUTE_ACCESS_GROUPS, REFINERY_ATTRIBUTE_ACCESS_USERS]
202+
),
203+
Attribute.user_created == False,
204+
Attribute.data_type == enums.DataTypes.PERMISSION.value,
205+
Attribute.state == enums.AttributeState.AUTOMATICALLY_CREATED.value,
206+
)
207+
.count()
208+
> 0
209+
)
210+
211+
158212
def get_all_by_user_organization_id(organization_id: str) -> List[Project]:
159213
projects = (
160214
session.query(Project).filter(Project.organization_id == organization_id).all()

business_objects/record.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import with_statement
22
from typing import List, Dict, Any, Optional, Tuple, Iterable
3-
from sqlalchemy import cast, Text
3+
from sqlalchemy import cast, Text, String
44
from sqlalchemy.orm.attributes import flag_modified
55
from sqlalchemy.sql.expression import bindparam
66
from sqlalchemy import update
@@ -15,6 +15,10 @@
1515
Attribute,
1616
RecordTokenized,
1717
)
18+
from ..integration_objects.helper import (
19+
REFINERY_ATTRIBUTE_ACCESS_GROUPS,
20+
REFINERY_ATTRIBUTE_ACCESS_USERS,
21+
)
1822
from ..session import session
1923
from ..util import prevent_sql_injection
2024

@@ -609,7 +613,7 @@ def count_missing_tokenized_records(project_id: str) -> int:
609613
query = f"""
610614
SELECT COUNT(*)
611615
FROM (
612-
{get_records_without_tokenization(project_id, None, query_only = True)}
616+
{get_records_without_tokenization(project_id, None, query_only=True)}
613617
) record_query
614618
"""
615619
return general.execute_first(query)[0]
@@ -807,6 +811,29 @@ def delete_user_created_attribute(
807811
general.flush_or_commit(with_commit)
808812

809813

814+
def delete_access_management_attributes(
815+
project_id: str, with_commit: bool = True
816+
) -> None:
817+
access_groups_attribute_item = attribute.get_by_name(
818+
project_id, REFINERY_ATTRIBUTE_ACCESS_GROUPS
819+
)
820+
access_users_attribute_item = attribute.get_by_name(
821+
project_id, REFINERY_ATTRIBUTE_ACCESS_USERS
822+
)
823+
824+
if access_users_attribute_item and access_groups_attribute_item:
825+
record_items = get_all(project_id=project_id)
826+
for i, record_item in enumerate(record_items):
827+
if record_item.data.get(access_groups_attribute_item.name):
828+
del record_item.data[access_groups_attribute_item.name]
829+
if record_item.data.get(access_users_attribute_item.name):
830+
del record_item.data[access_users_attribute_item.name]
831+
flag_modified(record_item, "data")
832+
if (i + 1) % 1000 == 0:
833+
general.flush_or_commit(with_commit)
834+
general.flush_or_commit(with_commit)
835+
836+
810837
def delete_duplicated_rats(with_commit: bool = False) -> None:
811838
# no project so run for all to prevent expensive join with record table
812839
query = """
@@ -925,3 +952,19 @@ def get_first_no_text_column(project_id: str, record_id: str) -> str:
925952
WHERE r.project_id = '{project_id}' AND r.id = '{record_id}'
926953
"""
927954
return general.execute_first(query)[0]
955+
956+
957+
def get_record_ids_by_running_ids(project_id: str, running_ids: List[int]) -> List[str]:
958+
return [
959+
row[0]
960+
for row in (
961+
session.query(cast(Record.id, String))
962+
.filter(
963+
Record.project_id == project_id,
964+
Record.data[attribute.get_running_id_name(project_id)]
965+
.as_integer()
966+
.in_(running_ids),
967+
)
968+
.all()
969+
)
970+
]

cognition_objects/group.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
from datetime import datetime
2+
from typing import List, Optional
3+
from ..business_objects import general
4+
from ..session import session
5+
from ..models import CognitionGroup
6+
7+
8+
def get(group_id: str) -> CognitionGroup:
9+
return session.query(CognitionGroup).filter(CognitionGroup.id == group_id).first()
10+
11+
12+
def get_with_organization_id(organization_id: str, group_id: str) -> CognitionGroup:
13+
return (
14+
session.query(CognitionGroup)
15+
.filter(
16+
CognitionGroup.organization_id == organization_id,
17+
CognitionGroup.id == group_id,
18+
)
19+
.first()
20+
)
21+
22+
23+
def get_all(organization_id: str) -> List[CognitionGroup]:
24+
return (
25+
session.query(CognitionGroup)
26+
.filter(CognitionGroup.organization_id == organization_id)
27+
.order_by(CognitionGroup.name.asc())
28+
.all()
29+
)
30+
31+
32+
def get_all_by_integration_id(
33+
organization_id: str, integration_id: str
34+
) -> List[CognitionGroup]:
35+
integration_id_json = CognitionGroup.meta_data.op("->>")("integration_id")
36+
37+
return (
38+
session.query(CognitionGroup)
39+
.filter(
40+
CognitionGroup.organization_id == organization_id,
41+
integration_id_json == integration_id,
42+
)
43+
.order_by(CognitionGroup.name.asc())
44+
.all()
45+
)
46+
47+
48+
def get_all_by_integration_id_permission_grouped(
49+
organization_id: str, integration_id: str
50+
) -> List[CognitionGroup]:
51+
integration_id_json = CognitionGroup.meta_data.op("->>")("integration_id")
52+
53+
integration_groups = (
54+
session.query(CognitionGroup)
55+
.filter(
56+
CognitionGroup.organization_id == organization_id,
57+
integration_id_json == integration_id,
58+
)
59+
.all()
60+
)
61+
62+
return {group.meta_data.get("permission_id"): group for group in integration_groups}
63+
64+
65+
def get_by_name_and_integration(
66+
organization_id: str, integration_id: str, name: str
67+
) -> CognitionGroup:
68+
integration_id_json = CognitionGroup.meta_data.op("->>")("integration_id")
69+
return (
70+
session.query(CognitionGroup)
71+
.filter(
72+
CognitionGroup.organization_id == organization_id,
73+
CognitionGroup.name == name,
74+
integration_id_json == integration_id,
75+
)
76+
.first()
77+
)
78+
79+
80+
def create_group(
81+
organization_id: str,
82+
name: str,
83+
description: str,
84+
created_by: str,
85+
created_at: Optional[datetime] = None,
86+
with_commit: bool = True,
87+
meta_data: Optional[dict] = None,
88+
) -> CognitionGroup:
89+
group = CognitionGroup(
90+
organization_id=organization_id,
91+
name=name,
92+
description=description,
93+
created_by=created_by,
94+
created_at=created_at,
95+
meta_data=meta_data,
96+
)
97+
general.add(group, with_commit)
98+
return group
99+
100+
101+
def update_group(
102+
group_id: str,
103+
name: Optional[str] = None,
104+
description: Optional[str] = None,
105+
with_commit: bool = True,
106+
meta_data: Optional[dict] = None,
107+
) -> CognitionGroup:
108+
group = get(group_id)
109+
110+
if name is not None:
111+
group.name = name
112+
if description is not None:
113+
group.description = description
114+
if meta_data is not None:
115+
group.meta_data = meta_data
116+
general.flush_or_commit(with_commit)
117+
118+
return group
119+
120+
121+
def delete(organization_id: str, group_id: str, with_commit: bool = True) -> None:
122+
group = get_with_organization_id(organization_id, group_id)
123+
if group:
124+
general.delete(group, with_commit)

0 commit comments

Comments
 (0)