@@ -148,23 +148,33 @@ class TFixture : public NTests::TBaseFixture {
148148 Runtime.Send (new IEventHandle (TopicSession, readActorId, event.release ()));
149149 }
150150
151- void ExpectMessageBatch (NActors::TActorId readActorId, const TBatch& expected, const std::vector<ui64>& expectedLastOffset = {}) {
152- Runtime.Send (new IEventHandle (TopicSession, readActorId, new TEvRowDispatcher::TEvGetNextBatch ()));
151+ void ExpectMessageBatch (NActors::TActorId readActorId, TList<TRow> expected, bool expectNewDataArrived = true , std::vector<ui64> expectedLastOffset = {}) {
152+ while (!expected.empty ()) {
153+ if (expectNewDataArrived) {
154+ ExpectNewDataArrived ({readActorId});
155+ }
156+ Runtime.Send (new IEventHandle (TopicSession, readActorId, new TEvRowDispatcher::TEvGetNextBatch ()));
157+ auto eventHolder = Runtime.GrabEdgeEvent <TEvRowDispatcher::TEvMessageBatch>(RowDispatcherActorId, TDuration::Seconds (GrabTimeoutSec));
158+ UNIT_ASSERT (eventHolder.Get () != nullptr );
159+ UNIT_ASSERT_VALUES_EQUAL (eventHolder->Get ()->ReadActorId , readActorId);
153160
154- auto eventHolder = Runtime.GrabEdgeEvent <TEvRowDispatcher::TEvMessageBatch>(RowDispatcherActorId, TDuration::Seconds (GrabTimeoutSec));
155- UNIT_ASSERT (eventHolder.Get () != nullptr );
156- UNIT_ASSERT_VALUES_EQUAL (eventHolder->Get ()->ReadActorId , readActorId);
157- UNIT_ASSERT_VALUES_EQUAL (1 , eventHolder->Get ()->Record .MessagesSize ());
158-
159- NFq::NRowDispatcherProto::TEvMessage message = eventHolder->Get ()->Record .GetMessages (0 );
160- UNIT_ASSERT_VALUES_EQUAL (message.OffsetsSize (), expected.Rows .size ());
161- if (!expectedLastOffset.empty ()) {
162- UNIT_ASSERT_VALUES_EQUAL (expectedLastOffset.size (), message.OffsetsSize ());
163- for (size_t i =0 ; i < expectedLastOffset.size (); ++i) {
164- UNIT_ASSERT_VALUES_EQUAL (expectedLastOffset[i], message.GetOffsets ().Get (i));
161+ UNIT_ASSERT_VALUES_EQUAL (1 , eventHolder->Get ()->Record .MessagesSize ());
162+ NFq::NRowDispatcherProto::TEvMessage message = eventHolder->Get ()->Record .GetMessages (0 );
163+ UNIT_ASSERT (message.OffsetsSize () <= expected.size ());
164+
165+ if (!expectedLastOffset.empty ()) {
166+ UNIT_ASSERT_VALUES_EQUAL (expectedLastOffset.size (), message.OffsetsSize ());
167+ for (size_t i = 0 ; i < message.OffsetsSize (); ++i) {
168+ UNIT_ASSERT_VALUES_EQUAL (expectedLastOffset[i], message.GetOffsets ().Get (i));
169+ }
170+ expectedLastOffset.erase (expectedLastOffset.begin (), expectedLastOffset.begin () + message.OffsetsSize ());
165171 }
172+ auto itEnd = expected.begin ();
173+ advance (itEnd, message.OffsetsSize ());
174+ TBatch expectedBatch{expected.begin (), itEnd};
175+ expected.erase (expected.begin (), itEnd);
176+ CheckMessageBatch (eventHolder->Get ()->GetPayload (message.GetPayloadId ()), expectedBatch);
166177 }
167- CheckMessageBatch (eventHolder->Get ()->GetPayload (message.GetPayloadId ()), expected);
168178 }
169179
170180 void ExpectSessionError (NActors::TActorId readActorId, TStatusCode statusCode, TString message = " " ) {
@@ -296,17 +306,17 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
296306 PQWrite (data);
297307 ExpectNewDataArrived ({ReadActorId1, ReadActorId2});
298308
299- ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ) });
300- ExpectMessageBatch (ReadActorId2, { JsonMessage (1 ) });
309+ ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ) }, false );
310+ ExpectMessageBatch (ReadActorId2, { JsonMessage (1 ) }, false );
301311 ExpectStatistics ({{ReadActorId1, 1 }, {ReadActorId2, 1 }});
302312
303313 data = { Json2 };
304314 PQWrite (data);
305315 ExpectNewDataArrived ({ReadActorId1, ReadActorId2});
306316
307317 ExpectStatistics ({{ReadActorId1, 1 }, {ReadActorId2, 1 }});
308- ExpectMessageBatch (ReadActorId1, { JsonMessage (2 ) });
309- ExpectMessageBatch (ReadActorId2, { JsonMessage (2 ) });
318+ ExpectMessageBatch (ReadActorId1, { JsonMessage (2 ) }, false );
319+ ExpectMessageBatch (ReadActorId2, { JsonMessage (2 ) }, false );
310320 ExpectStatistics ({{ReadActorId1, 2 }, {ReadActorId2, 2 }});
311321
312322 auto source2 = BuildSource (false , " OtherConsumer" );
@@ -329,10 +339,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
329339 const std::vector<TString> data = { Json1 };
330340 PQWrite (data);
331341 ExpectNewDataArrived ({ReadActorId1, ReadActorId2});
332- Runtime.Send (new IEventHandle (TopicSession, ReadActorId1, new TEvRowDispatcher::TEvGetNextBatch ()));
333- Runtime.Send (new IEventHandle (TopicSession, ReadActorId2, new TEvRowDispatcher::TEvGetNextBatch ()));
334- ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ) });
335- ExpectMessageBatch (ReadActorId2, { JsonMessage (1 ) });
342+ ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ) }, false );
343+ ExpectMessageBatch (ReadActorId2, { JsonMessage (1 ) }, false );
336344
337345 StopSession (ReadActorId1, source1);
338346 StopSession (ReadActorId2, source2);
@@ -350,8 +358,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
350358 const std::vector<TString> data = { Json1 };
351359 PQWrite (data);
352360 ExpectNewDataArrived ({ReadActorId1, ReadActorId2});
353- ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ) });
354- ExpectMessageBatch (ReadActorId2, { JsonMessage (1 ) });
361+ ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ) }, false );
362+ ExpectMessageBatch (ReadActorId2, { JsonMessage (1 ) }, false );
355363
356364 StopSession (ReadActorId1, source1);
357365 StopSession (ReadActorId2, source2);
@@ -366,7 +374,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
366374
367375 const std::vector<TString> data = { Json1 };
368376 PQWrite (data);
369- ExpectNewDataArrived ({ReadActorId1});
370377 ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ) });
371378
372379 StartSession (ReadActorId2, source);
@@ -375,8 +382,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
375382 PQWrite (data2);
376383 ExpectNewDataArrived ({ReadActorId1, ReadActorId2});
377384
378- ExpectMessageBatch (ReadActorId1, { JsonMessage (2 ) });
379- ExpectMessageBatch (ReadActorId2, { JsonMessage (2 ) });
385+ ExpectMessageBatch (ReadActorId1, { JsonMessage (2 ) }, false );
386+ ExpectMessageBatch (ReadActorId2, { JsonMessage (2 ) }, false );
380387
381388 StopSession (ReadActorId1, source);
382389 StopSession (ReadActorId2, source);
@@ -394,17 +401,14 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
394401 StartSession (ReadActorId2, source, 2 );
395402
396403 ExpectNewDataArrived ({ReadActorId1, ReadActorId2});
397- TBatch expected1 = { JsonMessage (2 ), JsonMessage (3 ) };
398- ExpectMessageBatch (ReadActorId1, expected1);
399-
400- TBatch expected2 = { JsonMessage (3 ) };
401- ExpectMessageBatch (ReadActorId2, expected2);
404+ ExpectMessageBatch (ReadActorId1, { JsonMessage (2 ), JsonMessage (3 ) }, false );
405+ ExpectMessageBatch (ReadActorId2, { JsonMessage (3 ) }, false );
402406
403407 const std::vector<TString> data2 = { Json4 };
404408 PQWrite (data2);
405409 ExpectNewDataArrived ({ReadActorId1, ReadActorId2});
406- ExpectMessageBatch (ReadActorId1, { JsonMessage (4 ) });
407- ExpectMessageBatch (ReadActorId2, { JsonMessage (4 ) });
410+ ExpectMessageBatch (ReadActorId1, { JsonMessage (4 ) }, false );
411+ ExpectMessageBatch (ReadActorId2, { JsonMessage (4 ) }, false );
408412
409413 StopSession (ReadActorId1, source);
410414 StopSession (ReadActorId2, source);
@@ -437,12 +441,10 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
437441 StartSession (ReadActorId2, source);
438442
439443 PQWrite ({ Json1 });
440- ExpectNewDataArrived ({ReadActorId1});
441444 ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ) });
442445 ExpectSessionError (ReadActorId2, EStatusId::PRECONDITION_FAILED, " Failed to parse json messages, found 1 missing values" );
443446
444447 PQWrite ({ Json2 });
445- ExpectNewDataArrived ({ReadActorId1});
446448 ExpectMessageBatch (ReadActorId1, { JsonMessage (2 ) });
447449 ExpectSessionError (ReadActorId2, EStatusId::PRECONDITION_FAILED, " Failed to parse json messages, found 1 missing values" );
448450
@@ -459,7 +461,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
459461
460462 const std::vector<TString> data = { Json1, Json2, Json3 }; // offset 0, 1, 2
461463 PQWrite (data);
462- ExpectNewDataArrived ({ReadActorId1});
463464 ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ), JsonMessage (2 ), JsonMessage (3 ) });
464465
465466 // Restart topic session.
@@ -469,8 +470,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
469470 PQWrite ({ Json4 });
470471 ExpectNewDataArrived ({ReadActorId1});
471472
472- ExpectMessageBatch (ReadActorId1, { JsonMessage (4 ) });
473- ExpectMessageBatch (ReadActorId2, { JsonMessage (2 ), JsonMessage (3 ), JsonMessage (4 ) });
473+ ExpectMessageBatch (ReadActorId1, { JsonMessage (4 ) }, false );
474+ ExpectMessageBatch (ReadActorId2, { JsonMessage (2 ), JsonMessage (3 ), JsonMessage (4 ) }, false );
474475
475476 StopSession (ReadActorId1, source);
476477 StopSession (ReadActorId2, source);
@@ -548,8 +549,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
548549
549550 PQWrite ({ json1, json2 });
550551 ExpectNewDataArrived ({ReadActorId1, ReadActorId2});
551- ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ), JsonMessage (2 ) });
552- ExpectMessageBatch (ReadActorId2, { JsonMessage (1 ).AddString (" field1" ), JsonMessage (2 ).AddString (" field2" ) });
552+ ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ), JsonMessage (2 ) }, false );
553+ ExpectMessageBatch (ReadActorId2, { JsonMessage (1 ).AddString (" field1" ), JsonMessage (2 ).AddString (" field2" ) }, false );
553554
554555 auto source3 = BuildSource ();
555556 source3.AddColumns (" field2" );
@@ -560,17 +561,16 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
560561 TString json3 = " {\" dt\" :300,\" value\" :\" value3\" , \" field1\" :\" value1_field1\" , \" field2\" :\" value1_field2\" }" ;
561562 PQWrite ({ json3 });
562563 ExpectNewDataArrived ({ReadActorId1, ReadActorId2, readActorId3});
563- ExpectMessageBatch (ReadActorId1, { JsonMessage (3 ) });
564- ExpectMessageBatch (ReadActorId2, { JsonMessage (3 ).AddString (" value1_field1" ) });
565- ExpectMessageBatch (readActorId3, { JsonMessage (3 ).AddString (" value1_field2" ) });
564+ ExpectMessageBatch (ReadActorId1, { JsonMessage (3 ) }, false );
565+ ExpectMessageBatch (ReadActorId2, { JsonMessage (3 ).AddString (" value1_field1" ) }, false );
566+ ExpectMessageBatch (readActorId3, { JsonMessage (3 ).AddString (" value1_field2" ) }, false );
566567
567568 StopSession (ReadActorId1, source3);
568569 StopSession (readActorId3, source3);
569570
570571 TString json4 = " {\" dt\" :400,\" value\" :\" value4\" , \" field1\" :\" value2_field1\" , \" field2\" :\" value2_field2\" }" ;
571572 TString json5 = " {\" dt\" :500,\" value\" :\" value5\" , \" field1\" :\" value3_field1\" , \" field2\" :\" value3_field2\" }" ;
572573 PQWrite ({ json4, json5 });
573- ExpectNewDataArrived ({ReadActorId2});
574574 ExpectMessageBatch (ReadActorId2, { JsonMessage (4 ).AddString (" value2_field1" ), JsonMessage (5 ).AddString (" value3_field1" ) });
575575
576576 StopSession (ReadActorId1, source1);
@@ -589,7 +589,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
589589
590590 TString json1 = " {\" dt\" :100,\" field1\" :\" str\" ,\" value\" :\" value1\" }" ;
591591 PQWrite ({ json1 });
592- ExpectNewDataArrived ({ReadActorId1});
593592 ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ).AddString (" str" , true ) });
594593
595594 auto source2 = BuildSource ();
@@ -606,13 +605,11 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
606605 StartSession (ReadActorId1, source);
607606 std::vector<TString> data = { Json1, Json2, Json3 };
608607 PQWrite (data, 1 );
609- ExpectNewDataArrived ({ReadActorId1});
610608 ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ), JsonMessage (2 ), JsonMessage (3 ) });
611609
612610 StartSession (ReadActorId2, source, 1 );
613611 std::vector<TString> data2 = { Json1 };
614612 PQWrite (data2, 1 );
615- ExpectNewDataArrived ({ReadActorId2});
616613 ExpectMessageBatch (ReadActorId2, { JsonMessage (1 )});
617614
618615 StopSession (ReadActorId2, source);
@@ -622,7 +619,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
622619
623620 std::vector<TString> data3 = { Json4 };
624621 PQWrite (data3, 4 );
625- ExpectNewDataArrived ({ReadActorId1});
626622 ExpectMessageBatch (ReadActorId1, { JsonMessage (4 ) });
627623
628624 PassAway ();
@@ -634,16 +630,15 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
634630 Init (topicName);
635631 auto source = BuildSource (false , DefaultPqConsumer, true );
636632 StartSession (ReadActorId1, source);
637-
638- auto writeRead = [&](const std::vector<TString>& input, const TBatch & output) {
633+
634+ auto writeRead = [&](const std::vector<TString>& input, const TList<TRow> & output) {
639635 PQWrite (input);
640- if (output.Rows . empty ()) {
636+ if (output.empty ()) {
641637 return ;
642638 }
643- ExpectNewDataArrived ({ReadActorId1});
644639 ExpectMessageBatch (ReadActorId1, output);
645640 };
646-
641+
647642 auto test = [&](const TString& wrongJson) {
648643 writeRead ({ wrongJson }, { });
649644 Sleep (TDuration::MilliSeconds (100 ));
@@ -661,8 +656,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
661656 test (" }" );
662657 test (" {" );
663658 writeRead ({ " {\" dt\" :100}" , " {}\x80 " , Json3 }, { JsonMessage (3 ) });
664- writeRead ({Json1 + Json1, Json3 }, { JsonMessage (1 ), JsonMessage (1 ) }); // not checked
665- writeRead ({Json1.substr (0 , 3 ), Json1.substr (3 ), Json2, Json3 }, { JsonMessage (1 ), JsonMessage (2 ), JsonMessage (3 ) });
659+ writeRead ({Json1 + Json1 }, { JsonMessage (1 ) }); // not checked
666660 PassAway ();
667661 }
668662
@@ -675,8 +669,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
675669
676670 TString wrongJson{" wrong" };
677671 PQWrite ({ Json1, wrongJson, wrongJson, Json3 });
678- ExpectNewDataArrived ({ReadActorId1});
679- ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ), JsonMessage (3 ) }, {0 , 3 });
672+ ExpectMessageBatch (ReadActorId1, { JsonMessage (1 ), JsonMessage (3 ) }, true , {0 , 3 });
680673 PassAway ();
681674 }
682675}
0 commit comments