Skip to content

Commit 84e0ea6

Browse files
authored
Merge sl on empty inputs stable 25 3 1 (#28671)
2 parents 8f9bd3e + 9576a8e commit 84e0ea6

File tree

2 files changed

+75
-0
lines changed

2 files changed

+75
-0
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,13 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
606606
auto guard = BindAllocator();
607607

608608
NUdf::TUnboxedValue row;
609+
610+
YQL_ENSURE(!Input.IsInvalid());
611+
if (Input.IsFinish() || !Input.HasValue()) {
612+
LastFetchStatus = NUdf::EFetchStatus::Finish;
613+
return;
614+
}
615+
609616
while ((LastFetchStatus = Input.Fetch(row)) == NUdf::EFetchStatus::Ok) {
610617
StreamLookupWorker->AddInputRow(std::move(row));
611618
}

ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,26 @@ void PrepareTables(TSession session) {
8585
INDEX idx GLOBAL ON (idx_a, idx_b)
8686
);
8787
88+
CREATE TABLE `Level1` ( `Id` Int32 NOT NULL, `Name` Utf8, `Date` Timestamp NOT NULL, `Level2_Name` Utf8, `OneToOne_Required_PK_Date` Timestamp, `Level1_Required_Id` Int32, `Level1_Optional_Id` Int32, `Level3_Name` Utf8,
89+
`Level2_Required_Id` Int32, `Level2_Optional_Id` Int32, `Level4_Name` Utf8, `Level3_Required_Id` Int32, `Level3_Optional_Id` Int32, `OneToOne_Optional_PK_Inverse4Id` Int32, `OneToMany_Required_Inverse4Id` Int32,
90+
`OneToMany_Optional_Inverse4Id` Int32, `OneToOne_Optional_PK_Inverse3Id` Int32, `OneToMany_Required_Inverse3Id` Int32, `OneToMany_Optional_Inverse3Id` Int32, `OneToOne_Optional_PK_Inverse2Id` Int32, `OneToMany_Required_Inverse2Id` Int32, `OneToMany_Optional_Inverse2Id` Int32,
91+
INDEX `IX_Level1_Level1_Optional_Id` GLOBAL SYNC ON (`Level1_Optional_Id`),
92+
INDEX `IX_Level1_Level1_Required_Id` GLOBAL SYNC ON (`Level1_Required_Id`),
93+
INDEX `IX_Level1_Level2_Optional_Id` GLOBAL SYNC ON (`Level2_Optional_Id`),
94+
INDEX `IX_Level1_Level2_Required_Id` GLOBAL SYNC ON (`Level2_Required_Id`),
95+
INDEX `IX_Level1_Level3_Optional_Id` GLOBAL SYNC ON (`Level3_Optional_Id`),
96+
INDEX `IX_Level1_Level3_Required_Id` GLOBAL SYNC ON (`Level3_Required_Id`),
97+
INDEX `IX_Level1_OneToMany_Optional_Inverse2Id` GLOBAL SYNC ON (`OneToMany_Optional_Inverse2Id`),
98+
INDEX `IX_Level1_OneToMany_Optional_Inverse3Id` GLOBAL SYNC ON (`OneToMany_Optional_Inverse3Id`),
99+
INDEX `IX_Level1_OneToMany_Optional_Inverse4Id` GLOBAL SYNC ON (`OneToMany_Optional_Inverse4Id`),
100+
INDEX `IX_Level1_OneToMany_Required_Inverse2Id` GLOBAL SYNC ON (`OneToMany_Required_Inverse2Id`),
101+
INDEX `IX_Level1_OneToMany_Required_Inverse3Id` GLOBAL SYNC ON (`OneToMany_Required_Inverse3Id`),
102+
INDEX `IX_Level1_OneToMany_Required_Inverse4Id` GLOBAL SYNC ON (`OneToMany_Required_Inverse4Id`),
103+
INDEX `IX_Level1_OneToOne_Optional_PK_Inverse2Id` GLOBAL SYNC ON (`OneToOne_Optional_PK_Inverse2Id`),
104+
INDEX `IX_Level1_OneToOne_Optional_PK_Inverse3Id` GLOBAL SYNC ON (`OneToOne_Optional_PK_Inverse3Id`),
105+
INDEX `IX_Level1_OneToOne_Optional_PK_Inverse4Id` GLOBAL SYNC ON (`OneToOne_Optional_PK_Inverse4Id`),
106+
PRIMARY KEY (`Id`)
107+
);
88108
CREATE TABLE X (x_id Int32, a Int32, b Int32, PRIMARY KEY(x_id));
89109
CREATE TABLE Y (y_id Int32, a Int32, b Int32, c Int32, PRIMARY KEY(y_id), INDEX ix_a GLOBAL ON (a));
90110
@@ -1117,6 +1137,54 @@ Y_UNIT_TEST_TWIN(LeftJoinOnRightTableOverIndex, StreamLookupJoin) {
11171137
tester.Run();
11181138
}
11191139

1140+
Y_UNIT_TEST_TWIN(TestEntityFramework, StreamLookupJoin) {
1141+
auto tester = TTester{
1142+
.Query=R"(
1143+
SELECT
1144+
`l1`.`Id`,
1145+
`l1`.`OneToOne_Required_PK_Date`,
1146+
`l1`.`Level1_Optional_Id`,
1147+
`l1`.`Level1_Required_Id`,
1148+
`l1`.`Level2_Name`,
1149+
`l1`.`OneToMany_Optional_Inverse2Id`,
1150+
`l1`.`OneToMany_Required_Inverse2Id`,
1151+
`l1`.`OneToOne_Optional_PK_Inverse2Id`
1152+
FROM `Level1` AS `l`
1153+
LEFT JOIN (
1154+
SELECT
1155+
`l0`.`Id` AS `Id`,
1156+
`l0`.`OneToOne_Required_PK_Date` AS `OneToOne_Required_PK_Date`,
1157+
`l0`.`Level1_Optional_Id` AS `Level1_Optional_Id`,
1158+
`l0`.`Level1_Required_Id` AS `Level1_Required_Id`,
1159+
`l0`.`Level2_Name` AS `Level2_Name`,
1160+
`l0`.`OneToMany_Optional_Inverse2Id` AS `OneToMany_Optional_Inverse2Id`,
1161+
`l0`.`OneToMany_Required_Inverse2Id` AS `OneToMany_Required_Inverse2Id`,
1162+
`l0`.`OneToOne_Optional_PK_Inverse2Id` AS `OneToOne_Optional_PK_Inverse2Id`
1163+
FROM `Level1` AS `l0`
1164+
WHERE
1165+
`l0`.`OneToOne_Required_PK_Date` IS NOT NULL AND
1166+
`l0`.`Level1_Required_Id` IS NOT NULL AND
1167+
`l0`.`OneToMany_Required_Inverse2Id` IS NOT NULL
1168+
) AS `l1`
1169+
ON
1170+
`l`.`Id` = CASE
1171+
WHEN `l1`.`OneToOne_Required_PK_Date` IS NOT NULL AND `l1`.`Level1_Required_Id` IS NOT NULL AND `l1`.`OneToMany_Required_Inverse2Id` IS NOT NULL THEN `l1`.`Id`
1172+
ELSE NULL
1173+
END
1174+
LEFT JOIN `Level1` AS `l2` ON `l1`.`Level1_Required_Id` = `l2`.`Id`
1175+
WHERE
1176+
`l1`.`OneToOne_Required_PK_Date` IS NOT NULL AND
1177+
`l1`.`Level1_Required_Id` IS NOT NULL AND
1178+
`l1`.`OneToMany_Required_Inverse2Id` IS NOT NULL AND `l2`.`Id` IN (1, 2)
1179+
)",
1180+
.Answer=R"([
1181+
])",
1182+
.StreamLookup=StreamLookupJoin,
1183+
.DoValidateStats=false,
1184+
};
1185+
tester.Run();
1186+
}
1187+
11201188
Y_UNIT_TEST_TWIN(JoinLeftJoinPostJoinFilterTest, StreamLookupJoin) {
11211189
auto tester = TTester{
11221190
.Query=R"(

0 commit comments

Comments
 (0)