Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 109 additions & 69 deletions services/libs/tinybird/pipes/activities_relations_filtered.pipe
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,24 @@ SQL >
{% if String(countOnly) == '1' or Int8(countOnly, 0) == 1 %} {% set is_count = 1 %} {% end %}
{% end %}
WITH
{% if defined(segments) %} arrayDistinct({{ Array(segments, 'String') }}) AS segments_arr
{% else %} [] AS segments_arr
{% end %},
base_ar AS (
SELECT
ar.activityId AS id,
ar.channel,
ar.memberId,
ar.organizationId,
ar.platform,
ar.segmentId,
ar.sourceId,
ar.sourceParentId,
ar.timestamp,
ar.type
ar.channel AS channel,
ar.memberId AS memberId,
ar.organizationId AS organizationId,
ar.platform AS platform,
ar.segmentId AS segmentId,
ar.sourceId AS sourceId,
ar.sourceParentId AS sourceParentId,
ar.timestamp AS timestamp,
ar.type AS type
FROM activityRelations_deduplicated_ds AS ar
WHERE
ar.segmentId IN {{ Array(segments, 'String') }}
(length(segments_arr) = 0 OR ar.segmentId IN segments_arr)
{% if defined(startDate) %}
AND ar.timestamp > parseDateTimeBestEffort({{ String(startDate) }})
{% end %}
Expand Down Expand Up @@ -373,41 +376,64 @@ SQL >
{% end %}
-- ================== end G1..G5 ==================
)
SELECT
ar.id,
ar.channel,
ar.memberId,
ar.organizationId,
ar.platform,
ar.segmentId,
ar.sourceId,
ar.sourceParentId,
ar.timestamp,
ar.type
FROM base_ar AS ar
{% if defined(searchTerm) and searchTerm %}
SEMI
JOIN
(
SELECT CAST(id AS String) AS id
FROM activities_deduplicated_ds
WHERE
1
{% if defined(startDate) %}
AND timestamp > parseDateTimeBestEffort({{ String(startDate) }})
{% end %}
{% if defined(endDate) %}
AND timestamp < parseDateTimeBestEffort({{ String(endDate) }})
{% end %}
AND (
positionCaseInsensitive(title, {{ String(searchTerm) }}) > 0
OR positionCaseInsensitive(body, {{ String(searchTerm) }}) > 0
)
) act
ON act.id = CAST(ar.id AS String)
{% end %}
WHERE 1
{% if not is_count %}
{% if is_count %}
SELECT count() AS count
FROM base_ar AS ar
{% if defined(searchTerm) and searchTerm %}
SEMI
JOIN
(
SELECT CAST(id AS String) AS id
FROM activities_deduplicated_ds
WHERE
1
{% if defined(startDate) %}
AND timestamp > parseDateTimeBestEffort({{ String(startDate) }})
{% end %}
{% if defined(endDate) %}
AND timestamp < parseDateTimeBestEffort({{ String(endDate) }})
{% end %}
AND (
positionCaseInsensitive(title, {{ String(searchTerm) }}) > 0
OR positionCaseInsensitive(body, {{ String(searchTerm) }}) > 0
)
) act
ON act.id = CAST(ar.id AS String)
{% end %}
{% else %}
SELECT
ar.id,
ar.channel,
ar.memberId,
ar.organizationId,
ar.platform,
ar.segmentId,
ar.sourceId,
ar.sourceParentId,
ar.timestamp,
ar.type
FROM base_ar AS ar
{% if defined(searchTerm) and searchTerm %}
SEMI
JOIN
(
SELECT CAST(id AS String) AS id
FROM activities_deduplicated_ds
WHERE
1
{% if defined(startDate) %}
AND timestamp > parseDateTimeBestEffort({{ String(startDate) }})
{% end %}
{% if defined(endDate) %}
AND timestamp < parseDateTimeBestEffort({{ String(endDate) }})
{% end %}
AND (
positionCaseInsensitive(title, {{ String(searchTerm) }}) > 0
OR positionCaseInsensitive(body, {{ String(searchTerm) }}) > 0
)
) act
ON act.id = CAST(ar.id AS String)
{% end %}
ORDER BY ar.timestamp DESC, ar.id DESC
LIMIT {{ Int32(pageSize, 10) }}
OFFSET {{ Int32(page, 0) * Int32(pageSize, 10) }}
Expand All @@ -416,30 +442,44 @@ SQL >
NODE activities_enriched_v1
SQL >
%
{% if defined(countOnly) and countOnly == '1' %}
SELECT countDistinct(fr.id) AS count FROM filtered_relations fr
{% set is_count = 0 %}
{% if defined(countOnly) %}
{% if String(countOnly) == '1' or Int8(countOnly, 0) == 1 %} {% set is_count = 1 %} {% end %}
{% end %}
WITH
{% if defined(segments) %} arrayDistinct({{ Array(segments, 'String') }}) AS segments_arr
{% else %} [] AS segments_arr
{% end %},
base_ar AS (
SELECT
ar.activityId AS id,
ar.channel,
ar.memberId,
ar.organizationId,
ar.platform,
ar.segmentId,
ar.sourceId,
ar.sourceParentId,
ar.timestamp,
ar.type
FROM activityRelations_deduplicated_ds AS ar
WHERE (length(segments_arr) = 0 OR ar.segmentId IN segments_arr)
)
{% if is_count %} SELECT count() AS count FROM base_ar AS ar
{% else %}
SELECT
fr.id,
fr.channel,
fr.memberId,
fr.organizationId,
fr.platform,
fr.segmentId,
fr.sourceId,
fr.sourceParentId,
fr.timestamp,
fr.type,
a.attributes,
a.url,
a.body,
a.title
FROM filtered_relations AS fr ANY
LEFT JOIN
(
SELECT CAST(id AS String) AS activity_id, attributes, url, body, title
FROM activities_deduplicated_ds
WHERE CAST(id AS String) IN (SELECT DISTINCT CAST(id AS String) FROM filtered_relations)
) AS a
ON CAST(fr.id AS String) = a.activity_id
ar.id,
ar.channel,
ar.memberId,
ar.organizationId,
ar.platform,
ar.segmentId,
ar.sourceId,
ar.sourceParentId,
ar.timestamp,
ar.type
FROM base_ar AS ar
ORDER BY ar.timestamp DESC, ar.id DESC
LIMIT {{ Int32(pageSize, 10) }}
OFFSET {{ Int32(page, 0) * Int32(pageSize, 10) }}
{% end %}