Skip to content

Commit b02850d

Browse files
Query for last chat messages (#199)
* Query for last chat messages * Fix in query for last chat messages * Query change for grouping orgs and projects * Query for etl tasks * Query for integrations tasks * Improved query for integrations tasks * Added project name to integrations tasks * Strategy info query * Integrations tasks page improvements * Strategu steps improvements * PR comments * PR comments --------- Co-authored-by: JWittmeyer <jens.wittmeyer@kern.ai>
1 parent 5783654 commit b02850d

File tree

5 files changed

+327
-2
lines changed

5 files changed

+327
-2
lines changed

cognition_objects/integration.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
CognitionIntegrationType,
1212
)
1313
from ..util import prevent_sql_injection
14+
from submodules.model import enums
1415

1516
FINISHED_STATES = [
1617
CognitionMarkdownFileState.FINISHED.value,
@@ -329,3 +330,150 @@ def get_distinct_item_ids_for_all_permissions(
329330
return []
330331

331332
return [row[0] for row in results if row and row[0]]
333+
334+
335+
def get_last_integrations_tasks() -> List[Dict[str, Any]]:
336+
query = f"""
337+
WITH embedding_agg AS (
338+
SELECT
339+
project_id,
340+
jsonb_object_agg(
341+
state,
342+
jsonb_build_object(
343+
'count', count,
344+
'embeddings', embeddings
345+
)
346+
) AS embeddings_by_state
347+
FROM (
348+
SELECT
349+
e.project_id,
350+
e.state,
351+
COUNT(*) AS count,
352+
jsonb_agg(
353+
jsonb_build_object(
354+
'createdBy', e.created_by,
355+
'finishedAt', e.finished_at,
356+
'id', e.id,
357+
'name', e.name,
358+
'startedAt', e.started_at,
359+
'state', e.state
360+
) ORDER BY e.started_at DESC
361+
) AS embeddings
362+
FROM embedding e
363+
GROUP BY e.project_id, e.state
364+
) AS x
365+
GROUP BY project_id
366+
),
367+
368+
attribute_agg AS (
369+
SELECT
370+
project_id,
371+
jsonb_object_agg(
372+
state,
373+
jsonb_build_object(
374+
'count', count,
375+
'attributes', attributes
376+
)
377+
) AS attributes_by_state
378+
FROM (
379+
SELECT
380+
a.project_id,
381+
a.state,
382+
COUNT(*) AS count,
383+
jsonb_agg(
384+
jsonb_build_object(
385+
'dataType', a.data_type,
386+
'finishedAt', a.finished_at,
387+
'id', a.id,
388+
'name', a.name,
389+
'startedAt', a.started_at,
390+
'state', a.state
391+
) ORDER BY a.started_at DESC
392+
) AS attributes
393+
FROM attribute a
394+
WHERE a.state NOT IN ('UPLOADED','AUTOMATICALLY_CREATED')
395+
GROUP BY a.project_id, a.state
396+
) AS x
397+
GROUP BY project_id
398+
),
399+
400+
record_tokenization_task_agg AS (
401+
SELECT
402+
project_id,
403+
jsonb_object_agg(
404+
state,
405+
jsonb_build_object(
406+
'count', count,
407+
'record_tokenization_tasks', record_tokenization_tasks
408+
)
409+
) AS record_tokenization_tasks_by_state
410+
FROM (
411+
SELECT
412+
rtt.project_id,
413+
rtt.state,
414+
COUNT(*) AS count,
415+
jsonb_agg(
416+
jsonb_build_object(
417+
'finishedAt', rtt.finished_at,
418+
'id', rtt.id,
419+
'startedAt', rtt.started_at,
420+
'state', rtt.state,
421+
'type', rtt.type
422+
) ORDER BY rtt.started_at DESC
423+
) AS record_tokenization_tasks
424+
FROM record_tokenization_task rtt
425+
GROUP BY rtt.project_id, rtt.state
426+
) AS x
427+
GROUP BY project_id
428+
),
429+
430+
integration_data AS (
431+
SELECT
432+
i.id AS integration_id,
433+
i.name AS integration_name,
434+
i.error_message,
435+
i.started_at,
436+
i.finished_at,
437+
i.state,
438+
i.organization_id,
439+
i.project_id,
440+
i.created_by,
441+
i.type,
442+
o.name AS organization_name,
443+
p.name AS project_name,
444+
jsonb_build_object(
445+
'embeddingsByState', coalesce(ea.embeddings_by_state, '[]'::jsonb),
446+
'attributesByState', coalesce(aa.attributes_by_state, '[]'::jsonb),
447+
'recordTokenizationTasksByState', coalesce(rtt.record_tokenization_tasks_by_state, '[]'::jsonb)
448+
) AS full_data
449+
FROM cognition.integration i
450+
LEFT JOIN embedding_agg ea
451+
ON ea.project_id = i.project_id
452+
LEFT JOIN attribute_agg aa
453+
ON aa.project_id = i.project_id
454+
LEFT JOIN record_tokenization_task_agg rtt
455+
ON rtt.project_id = i.project_id
456+
JOIN organization o
457+
ON o.id = i.organization_id
458+
JOIN project p
459+
ON p.id = i.project_id
460+
)
461+
462+
SELECT
463+
int_data.organization_id as organization_id,
464+
int_data.organization_name as organization_name,
465+
int_data.integration_id,
466+
int_data.integration_name,
467+
int_data.error_message,
468+
int_data.started_at,
469+
int_data.finished_at,
470+
int_data.state,
471+
int_data.full_data,
472+
int_data.created_by,
473+
int_data.type,
474+
int_data.project_name
475+
FROM integration_data int_data
476+
ORDER BY int_data.organization_id, int_data.started_at DESC
477+
"""
478+
479+
return general.execute_all(query)

cognition_objects/markdown_file.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,50 @@ def delete_many(org_id: str, md_file_ids: List[str], with_commit: bool = True) -
218218
CognitionMarkdownFile.id.in_(md_file_ids),
219219
).delete(synchronize_session=False)
220220
general.flush_or_commit(with_commit)
221+
222+
223+
def get_last_etl_tasks(
224+
states: List[str],
225+
created_at_from: str,
226+
created_at_to: Optional[str] = None,
227+
) -> List[Any]:
228+
229+
states = [prevent_sql_injection(st, isinstance(st, str)) for st in states]
230+
if len(states) == 0:
231+
return []
232+
233+
created_at_from = prevent_sql_injection(
234+
created_at_from, isinstance(created_at_from, str)
235+
)
236+
if created_at_to:
237+
created_at_to = prevent_sql_injection(
238+
created_at_to, isinstance(created_at_to, str)
239+
)
240+
created_at_to_filter = ""
241+
242+
if created_at_to:
243+
created_at_to_filter = f"AND mf.created_at <= '{created_at_to}'"
244+
245+
states_filter_sql = ", ".join([f"'{state}'" for state in states])
246+
247+
query = f"""
248+
SELECT *
249+
FROM (
250+
SELECT mf.created_at, mf.created_by, mf.started_at, mf.finished_at, mf.file_name, mf.error, mf.state, md.id AS dataset_id, md.name AS dataset_name, md.organization_id, o.name AS organization_name,
251+
ROW_NUMBER() OVER (
252+
PARTITION BY md.organization_id, md.id
253+
ORDER BY mf.created_at DESC
254+
) AS rn
255+
FROM cognition.markdown_file mf
256+
JOIN cognition.markdown_dataset md ON md.id = mf.dataset_id
257+
JOIN organization o ON o.id = md.organization_id
258+
WHERE
259+
mf.created_at >= '{created_at_from}'
260+
AND mf.state IN ({states_filter_sql})
261+
{created_at_to_filter}
262+
) sub
263+
WHERE rn <= 5
264+
ORDER BY organization_id, dataset_id, created_at DESC
265+
"""
266+
267+
return general.execute_all(query)

cognition_objects/message.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import Any, Dict, List, Optional, Union, Tuple
22
from datetime import datetime
3+
4+
from submodules.model.enums import MessageType
35
from ..business_objects import general
46
from ..session import session
57
from ..models import CognitionMessage
@@ -605,3 +607,48 @@ def get_count_by_project_id(project_id: str) -> int:
605607
)
606608
.count()
607609
)
610+
611+
612+
def get_last_chat_messages(
613+
message_type: MessageType,
614+
starting_from: str,
615+
ending_to: Optional[str] = None,
616+
) -> List[Any]:
617+
618+
message_type = prevent_sql_injection(message_type, isinstance(message_type, str))
619+
starting_from = prevent_sql_injection(starting_from, isinstance(starting_from, str))
620+
if ending_to:
621+
ending_to = prevent_sql_injection(ending_to, isinstance(ending_to, str))
622+
623+
message_type_filter = ""
624+
ending_to_filter = ""
625+
626+
if message_type == MessageType.WITH_ERROR:
627+
message_type_filter = "AND c.error IS NOT NULL"
628+
elif message_type == MessageType.WITHOUT_ERROR:
629+
message_type_filter = "AND c.error IS NULL"
630+
if ending_to:
631+
ending_to_filter = f"AND m.created_at <= '{ending_to}'"
632+
633+
query = f"""
634+
SELECT *
635+
FROM (
636+
SELECT m.created_at, m.created_by, m.question, m.answer, m.initiated_via, c.error, cp.id AS project_id, cp.name AS project_name, cp.organization_id, o.name AS organization_name, c.id AS conversation_id,
637+
ROW_NUMBER() OVER (
638+
PARTITION BY cp.organization_id, cp.id
639+
ORDER BY m.created_at DESC
640+
) AS rn
641+
FROM cognition.message m
642+
JOIN cognition.conversation c ON c.id = m.conversation_id
643+
JOIN cognition.project cp ON cp.id = m.project_id
644+
JOIN organization o ON o.id = cp.organization_id
645+
WHERE
646+
m.created_at >= '{starting_from}'
647+
{message_type_filter}
648+
{ending_to_filter}
649+
) sub
650+
WHERE rn <= 5
651+
ORDER BY organization_id, project_id, created_at DESC
652+
"""
653+
654+
return general.execute_all(query)

cognition_objects/strategy.py

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
from typing import List, Optional
1+
from typing import Any, List, Optional
22
from datetime import datetime
33

4+
from submodules.model.util import prevent_sql_injection
5+
46
from ..business_objects import general
57
from ..session import session
68
from ..models import CognitionStrategy
7-
from ..enums import StrategyComplexity
9+
from ..enums import StrategyComplexity, StrategyStepType
810

911

1012
def get(project_id: str, strategy_id: str) -> CognitionStrategy:
@@ -107,3 +109,78 @@ def delete_all_by_project_id(project_id: str, with_commit: bool = True) -> None:
107109
CognitionStrategy.project_id == project_id
108110
).delete()
109111
general.flush_or_commit(with_commit)
112+
113+
114+
def get_strategies_info(
115+
step_types: List[str],
116+
created_at_from: str,
117+
created_at_to: Optional[str] = None,
118+
) -> List[Any]:
119+
120+
step_types = [prevent_sql_injection(st, isinstance(st, str)) for st in step_types]
121+
if len(step_types) == 0:
122+
return []
123+
124+
created_at_from = prevent_sql_injection(
125+
created_at_from, isinstance(created_at_from, str)
126+
)
127+
if created_at_to:
128+
created_at_to = prevent_sql_injection(
129+
created_at_to, isinstance(created_at_to, str)
130+
)
131+
created_at_to_filter = ""
132+
133+
if created_at_to:
134+
created_at_to_filter = f"AND ss.created_at <= '{created_at_to}'"
135+
136+
step_types_sql = ", ".join([f"'{st}'" for st in step_types])
137+
138+
query = f"""
139+
WITH step_data AS (
140+
SELECT
141+
s.id AS strategy_id, s.name AS strategy_name,
142+
ss.id AS step_id, ss.created_by,ss.created_at, ss.name AS step_name, ss.step_type,
143+
p.name AS project_name, p.id AS project_id,
144+
o.name AS organization_name, o.id AS organization_id,
145+
st.config::jsonb AS template_config,
146+
CASE
147+
WHEN ss.step_type = '{StrategyStepType.TEMPLATED.value}' AND st.config IS NOT NULL
148+
THEN ARRAY(
149+
SELECT (t->>'stepType') || ':' || (t->>'stepName')
150+
FROM jsonb_array_elements((st.config->'steps')::jsonb) t
151+
)
152+
ELSE NULL
153+
END AS template_step_names,
154+
CASE
155+
WHEN ss.step_type = '{StrategyStepType.TEMPLATED.value}' AND st.config IS NOT NULL
156+
THEN ARRAY(
157+
SELECT t->>'stepType'
158+
FROM jsonb_array_elements((st.config->'steps')::jsonb) t
159+
)
160+
ELSE NULL
161+
END AS template_step_types
162+
FROM cognition.strategy s
163+
JOIN cognition.strategy_step ss
164+
ON ss.strategy_id = s.id
165+
JOIN cognition.project p
166+
ON p.id = s.project_id
167+
JOIN organization o
168+
ON o.id = p.organization_id
169+
LEFT JOIN cognition.step_templates st
170+
ON st.id = (ss.config->>'templateId')::uuid
171+
WHERE ss.created_at >= '{created_at_from}'
172+
{created_at_to_filter}
173+
)
174+
SELECT strategy_id, strategy_name, step_id, created_by, created_at, step_name, step_type, project_name, project_id, organization_name, organization_id,
175+
CASE
176+
WHEN step_type = '{StrategyStepType.TEMPLATED.value}' THEN template_step_names
177+
ELSE ARRAY[step_type || ':' || step_name]
178+
END AS templated_step_names
179+
FROM step_data
180+
WHERE
181+
step_type IN ({step_types_sql})
182+
OR (step_type = '{StrategyStepType.TEMPLATED.value}' AND template_step_types && ARRAY[{step_types_sql}])
183+
ORDER BY strategy_id, created_at DESC
184+
"""
185+
186+
return general.execute_all(query)

enums.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,5 +1017,11 @@ class MessageInitiationType(Enum):
10171017
MACRO = "MACRO"
10181018

10191019

1020+
class MessageType(Enum):
1021+
WITH_ERROR = "WITH_ERROR"
1022+
WITHOUT_ERROR = "WITHOUT_ERROR"
1023+
ALL = "ALL"
1024+
1025+
10201026
class TimedExecutionKey(Enum):
10211027
LAST_RESET_USER_MESSAGE_COUNT = "LAST_RESET_USER_MESSAGE_COUNT"

0 commit comments

Comments
 (0)