Skip to content

Commit 802c6dc

Browse files
authored
clean inserted portions normalizer has been added (#28406)
1 parent 3b24ab6 commit 802c6dc

File tree

4 files changed

+167
-10
lines changed

4 files changed

+167
-10
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
#include "clean_inserted_portions.h"
2+
3+
#include <ydb/core/formats/arrow/arrow_helpers.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5+
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
6+
#include <ydb/core/tx/columnshard/engines/portions/read_with_blobs.h>
7+
#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
8+
#include <ydb/core/tx/columnshard/tables_manager.h>
9+
10+
#include <util/string/vector.h>
11+
12+
namespace NKikimr::NOlap::NNormalizer::NCleanInsertedPortions {
13+
14+
class TCleanInsertedPortionsNormalizer::TNormalizerResult: public INormalizerChanges {
15+
std::vector<TPortionDataAccessor> InsertedPortions;
16+
17+
public:
18+
TNormalizerResult(std::vector<TPortionDataAccessor>&& portions)
19+
: InsertedPortions(std::move(portions)) {
20+
}
21+
22+
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& normController) const override {
23+
NOlap::TBlobManagerDb blobManagerDb(txc.DB);
24+
TDbWrapper db(txc.DB, nullptr);
25+
for (auto&& portionInfo : InsertedPortions) {
26+
auto copy = portionInfo.GetPortionInfo().MakeCopy();
27+
copy->SetRemoveSnapshot(TSnapshot(1, 1));
28+
db.WritePortion(portionInfo.GetBlobIds(), *copy);
29+
}
30+
if (InsertedPortions.size()) {
31+
NIceDb::TNiceDb db(txc.DB);
32+
normController.AddNormalizerEvent(db, "REMOVE_PORTIONS", DebugString());
33+
}
34+
return true;
35+
}
36+
37+
void ApplyOnComplete(const TNormalizationController& /* normController */) const override {
38+
}
39+
40+
ui64 GetSize() const override {
41+
return InsertedPortions.size();
42+
}
43+
44+
TString DebugString() const override {
45+
TStringBuilder sb;
46+
ui64 recordsCount = 0;
47+
sb << "path_ids=[";
48+
for (auto&& p : InsertedPortions) {
49+
sb << p.GetPortionInfo().GetPathId() << ",";
50+
recordsCount += p.GetPortionInfo().GetRecordsCount();
51+
}
52+
sb << "]";
53+
sb << ";records_count=" << recordsCount;
54+
sb << ";inserted_portions_count=" << InsertedPortions.size();
55+
return sb;
56+
}
57+
};
58+
59+
bool TCleanInsertedPortionsNormalizer::CheckPortion(const NColumnShard::TTablesManager& /*tablesManager*/, const TPortionDataAccessor& /*portionInfo*/) const {
60+
return false;
61+
}
62+
63+
INormalizerTask::TPtr TCleanInsertedPortionsNormalizer::BuildTask(
64+
std::vector<TPortionDataAccessor>&& portions, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>>) const {
65+
std::vector<TPortionDataAccessor> insertedPortions;
66+
for (auto&& portion : portions) {
67+
if (portion.GetPortionInfo().GetProduced() == NPortion::EProduced::INSERTED) {
68+
insertedPortions.push_back(std::move(portion));
69+
}
70+
}
71+
auto taskResult = std::make_shared<TNormalizerResult>(std::move(insertedPortions));
72+
ACFL_WARN("normalizer", "TCleanInsertedPortionsNormalizer")("message", taskResult->DebugString());
73+
ACFL_WARN("normalizer", "TCleanInsertedPortionsNormalizer")("all portions", portions.size());
74+
return std::make_shared<TTrivialNormalizerTask>(taskResult);
75+
}
76+
77+
TConclusion<bool> TCleanInsertedPortionsNormalizer::DoInitImpl(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext&) {
78+
return true;
79+
}
80+
81+
} // namespace NKikimr::NOlap::NNormalizer::NCleanInsertedPortions
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#pragma once
2+
3+
#include "normalizer.h"
4+
5+
#include <ydb/core/tx/columnshard/defs.h>
6+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
7+
8+
namespace NKikimr::NColumnShard {
9+
class TTablesManager;
10+
}
11+
12+
namespace NKikimr::NOlap::NNormalizer::NCleanInsertedPortions {
13+
14+
class TCleanInsertedPortionsNormalizer: public TPortionsNormalizerBase {
15+
public:
16+
static TString GetClassNameStatic() {
17+
return "CleanInsertedPortions";
18+
}
19+
20+
private:
21+
static inline TFactory::TRegistrator<TCleanInsertedPortionsNormalizer> Registrator = TFactory::TRegistrator<TCleanInsertedPortionsNormalizer>(GetClassNameStatic());
22+
23+
public:
24+
class TNormalizerResult;
25+
26+
public:
27+
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
28+
return {};
29+
}
30+
31+
virtual TString GetClassName() const override {
32+
return GetClassNameStatic();
33+
}
34+
35+
TCleanInsertedPortionsNormalizer(const TNormalizationController::TInitContext& info)
36+
: TPortionsNormalizerBase(info) {
37+
}
38+
39+
virtual std::set<ui32> GetColumnsFilter(const ISnapshotSchema::TPtr& /*schema*/) const override {
40+
return {};
41+
}
42+
43+
virtual INormalizerTask::TPtr BuildTask(
44+
std::vector<TPortionDataAccessor>&& portions, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas) const override;
45+
virtual TConclusion<bool> DoInitImpl(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
46+
47+
virtual bool CheckPortion(const NColumnShard::TTablesManager& tablesManager, const TPortionDataAccessor& portionInfo) const override;
48+
};
49+
50+
} // namespace NKikimr::NOlap::NNormalizer::NCleanInsertedPortions

ydb/core/tx/columnshard/normalizer/portion/ya.make

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,25 @@ LIBRARY()
22

33
SRCS(
44
normalizer.cpp
5-
GLOBAL portion.cpp
6-
GLOBAL chunks.cpp
7-
GLOBAL clean.cpp
8-
GLOBAL clean_empty.cpp
95
GLOBAL broken_blobs.cpp
10-
GLOBAL special_cleaner.cpp
6+
GLOBAL chunks.cpp
117
GLOBAL chunks_actualization.cpp
12-
GLOBAL restore_v1_chunks.cpp
13-
GLOBAL restore_v2_chunks.cpp
14-
GLOBAL leaked_blobs.cpp
15-
GLOBAL clean_deprecated_snapshot.cpp
168
GLOBAL chunks_v0_meta.cpp
9+
GLOBAL clean.cpp
10+
GLOBAL clean_deprecated_snapshot.cpp
11+
GLOBAL clean_empty.cpp
1712
GLOBAL clean_index_columns.cpp
18-
GLOBAL clean_unused_tables_template.cpp
13+
GLOBAL clean_inserted_portions.cpp
1914
GLOBAL clean_ttl_preset_setting_info.cpp
2015
GLOBAL clean_ttl_preset_setting_version_info.cpp
16+
GLOBAL clean_unused_tables_template.cpp
2117
GLOBAL copy_blob_ids_to_v2.cpp
18+
GLOBAL leaked_blobs.cpp
19+
GLOBAL portion.cpp
2220
GLOBAL restore_appearance_snapshot.cpp
21+
GLOBAL restore_v1_chunks.cpp
22+
GLOBAL restore_v2_chunks.cpp
23+
GLOBAL special_cleaner.cpp
2324
)
2425

2526
PEERDIR(

ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ class TPortionsCleaner: public NYDBTest::ILocalDBModifier {
100100
}
101101
};
102102

103+
class TInsertedPortionsCleaner: public NYDBTest::ILocalDBModifier {
104+
public:
105+
virtual void Apply(NTabletFlatExecutor::TTransactionContext&) const override {
106+
}
107+
};
108+
103109
class TEmptyPortionsCleaner: public NYDBTest::ILocalDBModifier {
104110
public:
105111
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
@@ -329,6 +335,25 @@ Y_UNIT_TEST_SUITE(Normalizers) {
329335
};
330336
TestNormalizerImpl<TPortionsCleaner>(TLocalNormalizerChecker());
331337
}
338+
339+
Y_UNIT_TEST(InsertedPortionsCleanerNormalizer) {
340+
class TLocalNormalizerChecker: public TNormalizerChecker {
341+
public:
342+
virtual ui64 RecordsCountAfterReboot(const ui64 /*initialRecordsCount*/) const override {
343+
return 0;
344+
}
345+
virtual void CorrectFeatureFlagsOnStart(TFeatureFlags& /* featuresFlags */) const override {
346+
}
347+
virtual void CorrectConfigurationOnStart(NKikimrConfig::TColumnShardConfig& columnShardConfig) const override {
348+
{
349+
auto* repair = columnShardConfig.MutableRepairs()->Add();
350+
repair->SetClassName("CleanInsertedPortions");
351+
repair->SetDescription("Removing inserted portions");
352+
}
353+
}
354+
};
355+
TestNormalizerImpl<TInsertedPortionsCleaner>(TLocalNormalizerChecker());
356+
}
332357

333358
Y_UNIT_TEST(SchemaVersionsNormalizer) {
334359
class TLocalNormalizerChecker: public TNormalizerChecker {

0 commit comments

Comments
 (0)