55#include < ydb/core/persqueue/ut/common/pq_ut_common.h>
66#include < ydb/core/security/ticket_parser.h>
77
8+ #include < ydb/core/protos/grpc_pq_old.pb.h>
9+
810#include < ydb/core/testlib/fake_scheme_shard.h>
911#include < ydb/core/testlib/tablet_helpers.h>
1012
@@ -1470,10 +1472,22 @@ Y_UNIT_TEST(TestTimeRetention) {
14701472 });
14711473}
14721474
1473- Y_UNIT_TEST (TestCompactifiedWithRetention) {
1474- // TODO(abcdef): temporarily deleted
1475- return ;
1475+ TString GetSerializedData (ui64 seqNo, const TString& payload, const TString& key) {
1476+ NKikimrPQClient::TDataChunk proto;
1477+ proto.SetSeqNo (seqNo);
1478+ proto.SetData (payload);
1479+ if (!key.empty ()) {
1480+ auto *msgMeta = proto.AddMessageMeta ();
1481+ msgMeta->set_key (" __key" );
1482+ msgMeta->set_value (key);
1483+ }
1484+ TString dataChunkStr;
1485+ bool res = proto.SerializeToString (&dataChunkStr);
1486+ Y_ABORT_UNLESS (res);
1487+ return dataChunkStr;
1488+ }
14761489
1490+ Y_UNIT_TEST (TestCompactifiedWithRetention) {
14771491 TTestContext tc;
14781492 RunTestWithReboots (tc.TabletIds , [&]() {
14791493 return tc.InitialEventsFilter .Prepare ();
@@ -1485,24 +1499,29 @@ Y_UNIT_TEST(TestCompactifiedWithRetention) {
14851499
14861500 tc.Runtime ->GetAppData (0 ).PQConfig .MutableCompactionConfig ()->SetBlobsCount (0 );
14871501
1488- TVector<std::pair<ui64, TString>> data;
1489- activeZone = PlainOrSoSlow (true , false );
1490-
1502+ ui64 key = 1 ;
14911503 TString s{32 , ' c' };
14921504 ui32 pp = 8 + 4 + 2 + 9 ;
1493- for (ui32 i = 0 ; i < 10 ; ++i) {
1494- data.push_back ({i + 1 , s.substr (pp)});
1495- }
1505+ auto getData = [&] () {
1506+ TVector<std::pair<ui64, TString>> data;
1507+ for (ui32 i = 0 ; i < 10 ; ++i) {
1508+ data.push_back ({i + 1 , GetSerializedData (i + 1 , s.substr (pp), ToString (key++))});
1509+ }
1510+ return data;
1511+ };
1512+ activeZone = PlainOrSoSlow (true , false );
1513+
1514+
14961515 PQTabletPrepare ({.maxCountInPartition =1000 , .deleteTime =0 , .lowWatermark =100 , .enableCompactificationByKey = true }, {}, tc);
1497- CmdWrite (0 , " sourceid0" , data , tc, false , {}, true );
1498- CmdWrite (0 , " sourceid1" , data , tc, false );
1499- CmdWrite (0 , " sourceid2" , data , tc, false );
1516+ CmdWrite (0 , " sourceid0" , getData () , tc, false , {}, true );
1517+ CmdWrite (0 , " sourceid1" , getData () , tc, false );
1518+ CmdWrite (0 , " sourceid2" , getData () , tc, false );
15001519 PQGetPartInfo (0 , 30 , tc);
15011520
15021521 PQTabletPrepare ({.maxCountInPartition =1000 , .deleteTime =0 , .lowWatermark =100 , .enableCompactificationByKey = false }, {}, tc);
1503- CmdWrite (0 , " sourceid3" , data , tc, false );
1504- CmdWrite (0 , " sourceid4" , data , tc, false );
1505- CmdWrite (0 , " sourceid5" , data , tc, false );
1522+ CmdWrite (0 , " sourceid3" , getData () , tc, false );
1523+ CmdWrite (0 , " sourceid4" , getData () , tc, false );
1524+ CmdWrite (0 , " sourceid5" , getData () , tc, false );
15061525 Cerr << " Get part info with compactification disabled\n " ;
15071526 PQGetPartInfo (50 , 60 , tc);
15081527 });
0 commit comments