Skip to content

Commit 23c1bec

Browse files
committed
[COMMON] [ICRDMA] Fix unrealizable IContiguousChunk EXT-1594 (#27371)
The issue was: Caller has a pointer to IContiguousChunk If caller calls GetDataMut() and the object is not "private" (ref count is not equal to 1) the implementation had to perform "Detach()" and return pointer to copyed data. But we have no place to store newly allocated object. Ooops... The solution: Replace IContiguousChunk::GetDataMut() with IContiguousChunk::Clone() + IContiguousChunk::UnsafeGetDataMut()
1 parent 4f6b034 commit 23c1bec

File tree

12 files changed

+166
-40
lines changed

12 files changed

+166
-40
lines changed

ydb/core/blobstorage/crypto/ut/ut_helpers.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ class TAlignedBuf {
5252
return BufSize;
5353
}
5454

55+
size_t GetAlign() const {
56+
return Align;
57+
}
58+
5559
~TAlignedBuf() {
5660
delete[] Buf;
5761
}
@@ -69,17 +73,19 @@ class TRopeAlignedBufferBackend : public IContiguousChunk {
6973
return {reinterpret_cast<const char *>(Buffer.Data()), Buffer.Size()};
7074
}
7175

72-
TMutableContiguousSpan GetDataMut() override {
73-
return {reinterpret_cast<char *>(Buffer.Data()), Buffer.Size()};
74-
}
75-
7676
TMutableContiguousSpan UnsafeGetDataMut() override {
7777
return {reinterpret_cast<char *>(Buffer.Data()), Buffer.Size()};
7878
}
7979

8080
size_t GetOccupiedMemorySize() const override {
8181
return Buffer.Size();
8282
}
83+
84+
IContiguousChunk::TPtr Clone() override {
85+
auto newBackend = MakeIntrusive<TRopeAlignedBufferBackend>(Buffer.Size(), Buffer.GetAlign());
86+
::memcpy(newBackend->UnsafeGetDataMut().data(), GetData().data(), GetData().size());
87+
return newBackend;
88+
}
8389
};
8490

8591
void inline Print(const ui8* out, size_t size) {

ydb/core/blobstorage/vdisk/hulldb/base/hullds_arena.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,20 @@ namespace NKikimr {
2222
return Capacity;
2323
}
2424

25-
TMutableContiguousSpan GetDataMut() override {
25+
TMutableContiguousSpan UnsafeGetDataMut() override {
2626
return {Data, Capacity};
2727
}
2828

2929
static TIntrusivePtr<IContiguousChunk> Allocate() {
3030
return MakeIntrusive<TRopeArenaBackend>();
3131
}
32+
33+
IContiguousChunk::TPtr Clone() override {
34+
IContiguousChunk::TPtr buf = Allocate();
35+
TContiguousSpan src = GetData();
36+
::memcpy(buf->UnsafeGetDataMut().GetData(), src.Data(), src.GetSize());
37+
return buf;
38+
}
3239
};
3340

3441
}

ydb/core/erasure/erasure_split.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,17 @@ namespace NKikimr {
1010
return {ZeroData, sizeof(ZeroData)};
1111
}
1212

13-
TMutableContiguousSpan GetDataMut() override {
14-
return {const_cast<char*>(ZeroData), sizeof(ZeroData)};
15-
}
16-
1713
TMutableContiguousSpan UnsafeGetDataMut() override {
1814
return {const_cast<char*>(ZeroData), sizeof(ZeroData)};
1915
}
2016

2117
size_t GetOccupiedMemorySize() const override {
2218
return sizeof(ZeroData);
2319
}
20+
21+
IContiguousChunk::TPtr Clone() noexcept override {
22+
return this;
23+
}
2424
};
2525

2626
void ErasureSplitBlock42Prepare(const TRope& whole, std::span<TRope> parts) {

ydb/library/actors/interconnect/rdma/mem_pool.cpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ namespace NInterconnect::NRdma {
6262
return MRs.empty();
6363
}
6464

65+
TMemRegionPtr AllocMr(int size, ui32 flags) noexcept {
66+
return MemPool->Alloc(size, flags);
67+
}
68+
6569
private:
6670
std::vector<ibv_mr*> MRs;
6771
IMemPool* MemPool;
@@ -115,16 +119,28 @@ namespace NInterconnect::NRdma {
115119
TContiguousSpan TMemRegion::GetData() const {
116120
return TContiguousSpan(static_cast<const char*>(GetAddr()), GetSize());
117121
}
118-
TMutableContiguousSpan TMemRegion::GetDataMut() {
122+
123+
TMutableContiguousSpan TMemRegion::UnsafeGetDataMut() {
119124
return TMutableContiguousSpan(static_cast<char*>(GetAddr()), GetSize());
120125
}
126+
121127
size_t TMemRegion::GetOccupiedMemorySize() const {
122128
return GetSize();
123129
}
130+
124131
IContiguousChunk::EInnerType TMemRegion::GetInnerType() const noexcept {
125132
return EInnerType::RDMA_MEM_REG;
126133
}
127134

135+
IContiguousChunk::TPtr TMemRegion::Clone() noexcept {
136+
static const ui64 pageAlign = NSystemInfo::GetPageSize() - 1;
137+
const IMemPool::Flags flag = (((ui64)GetAddr() & pageAlign) == 0) ? IMemPool::PAGE_ALIGNED : IMemPool::EMPTY;
138+
TMemRegionPtr newRegion = Chunk->AllocMr(GetSize(), flag);
139+
auto span = newRegion->UnsafeGetDataMut();
140+
::memcpy(span.GetData(), GetAddr(), GetSize());
141+
return newRegion;
142+
}
143+
128144
TMemRegionSlice::TMemRegionSlice(TIntrusivePtr<TMemRegion> memRegion, uint32_t offset, uint32_t size) noexcept
129145
: MemRegion(std::move(memRegion))
130146
, Offset(offset)

ydb/library/actors/interconnect/rdma/mem_pool.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ namespace NInterconnect::NRdma {
3838

3939
public: // IContiguousChunk
4040
TContiguousSpan GetData() const override;
41-
TMutableContiguousSpan GetDataMut() override;
41+
TMutableContiguousSpan UnsafeGetDataMut() override;
4242
size_t GetOccupiedMemorySize() const override;
4343
EInnerType GetInnerType() const noexcept override;
44+
IContiguousChunk::TPtr Clone() noexcept override;
4445
protected:
4546
TChunkPtr Chunk;
4647
const uint32_t Offset;
@@ -73,6 +74,7 @@ namespace NInterconnect::NRdma {
7374

7475
class IMemPool {
7576
public:
77+
// WARN: In case of addition new flags consider "Clone()" method implementation
7678
enum Flags : ui32 {
7779
EMPTY = 0,
7880
PAGE_ALIGNED = 1, // Page alignment allocation

ydb/library/actors/interconnect/rdma/ut/allocator_ut.cpp

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,73 @@ TEST_P(WithAllPools, PageAlligned) {
304304
}
305305
}
306306

307+
TEST_P(WithAllPools, RcBufDetach) {
308+
auto memPool = GetParam();
309+
TRcBuf data = memPool->AllocRcBuf(4, 0).value();
310+
{
311+
::memcpy(data.UnsafeGetDataMut(), "test", 4);
312+
}
313+
TRcBuf data2 = data;
314+
UNIT_ASSERT_EQUAL(data.GetData(), data2.GetData());
315+
char* res = data2.Detach();
316+
UNIT_ASSERT_UNEQUAL(data.GetData(), data2.GetData());
317+
UNIT_ASSERT_EQUAL(res, data2.GetData());
318+
UNIT_ASSERT_EQUAL(::memcmp(res, "test", 4), 0);
319+
UNIT_ASSERT_EQUAL(::memcmp(data.GetData(), "test", 4), 0);
320+
}
321+
322+
TEST_P(WithAllPools, RcBufDetachAfterMut) {
323+
auto memPool = GetParam();
324+
TRcBuf data = memPool->AllocRcBuf(4, 0).value();
325+
{
326+
::memcpy(data.UnsafeGetDataMut(), "test", 4);
327+
}
328+
// Check GetDataMut doesn't change backend in case of single ref
329+
UNIT_ASSERT_EQUAL(data.GetData(), data.GetDataMut());
330+
TRcBuf data2 = data;
331+
UNIT_ASSERT_EQUAL(data.GetData(), data2.GetData());
332+
char* res = data2.Detach();
333+
UNIT_ASSERT_UNEQUAL(data.GetData(), data2.GetData());
334+
UNIT_ASSERT_EQUAL(res, data2.GetData());
335+
UNIT_ASSERT_EQUAL(::memcmp(res, "test", 4), 0);
336+
UNIT_ASSERT_EQUAL(::memcmp(data.GetData(), "test", 4), 0);
337+
}
338+
339+
TEST_P(WithAllPools, RcBufDetachPageAlign) {
340+
using namespace NInterconnect::NRdma;
341+
static const ui64 pageAlign = NSystemInfo::GetPageSize() - 1;
342+
auto memPool = GetParam();
343+
TRcBuf data = memPool->AllocRcBuf(4, IMemPool::PAGE_ALIGNED).value();
344+
{
345+
::memcpy(data.UnsafeGetDataMut(), "test", 4);
346+
}
347+
TRcBuf data2 = data;
348+
UNIT_ASSERT_EQUAL(data.GetData(), data2.GetData());
349+
UNIT_ASSERT_EQUAL((ui64)data.GetData() & pageAlign, 0ull);
350+
char* res = data2.Detach();
351+
UNIT_ASSERT_UNEQUAL(data.GetData(), data2.GetData());
352+
UNIT_ASSERT_EQUAL((ui64)data.GetData() & pageAlign, 0ull);
353+
UNIT_ASSERT_EQUAL((ui64)data2.GetData() & pageAlign, 0ull);
354+
UNIT_ASSERT_EQUAL(res, data2.GetData());
355+
UNIT_ASSERT_EQUAL(::memcmp(res, "test", 4), 0);
356+
UNIT_ASSERT_EQUAL(::memcmp(data.GetData(), "test", 4), 0);
357+
}
358+
359+
TEST_P(WithAllPools, DetachAndDestroySrc) {
360+
auto memPool = GetParam();
361+
TRcBuf data = memPool->AllocRcBuf(4, 0).value();
362+
{
363+
::memcpy(data.UnsafeGetDataMut(), "test", 4);
364+
}
365+
TRcBuf data2 = data;
366+
data = {};
367+
UNIT_ASSERT_EQUAL(data2.GetSize(), 4u);
368+
char* res = data2.Detach();
369+
UNIT_ASSERT_EQUAL(data2.GetSize(), 4u);
370+
UNIT_ASSERT_EQUAL(res, data2.GetData());
371+
UNIT_ASSERT_EQUAL(::memcmp(res, "test", 4), 0);
372+
}
373+
307374
INSTANTIATE_TEST_SUITE_P(
308375
TAllocatorSuite,
309376
WithAllPools,

ydb/library/actors/util/rc_buf.h

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -282,29 +282,19 @@ struct IContiguousChunk : TThrRefBase {
282282
*/
283283
virtual TContiguousSpan GetData() const = 0;
284284

285-
/**
286-
* Should give mutable access to underlying data
287-
* If data is shared - data should be copied
288-
* E.g. for TString str.Detach() should be used
289-
* Possibly invalidates previous *GetData*() calls
290-
*/
291-
virtual TMutableContiguousSpan GetDataMut() = 0;
292-
293285
/**
294286
* Should give mutable access to undelying data as fast as possible
295287
* Even if data is shared this property should be ignored
296288
* E.g. in TString const_cast<char *>(str.data()) should be used
297289
* Possibly invalidates previous *GetData*() calls
298290
*/
299-
virtual TMutableContiguousSpan UnsafeGetDataMut() {
300-
return GetDataMut();
301-
}
291+
virtual TMutableContiguousSpan UnsafeGetDataMut() = 0;
302292

303293
/**
304-
* Should return true if GetDataMut() would not copy contents when called.
294+
* Must return false if the implementation shares data
305295
*/
306296
virtual bool IsPrivate() const {
307-
return true;
297+
return RefCount() == 1;
308298
}
309299

310300
virtual size_t GetOccupiedMemorySize() const = 0;
@@ -316,6 +306,12 @@ struct IContiguousChunk : TThrRefBase {
316306
virtual EInnerType GetInnerType() const noexcept {
317307
return OTHER;
318308
}
309+
310+
/**
311+
* Allocate new chunk and copy data into it
312+
* NOTE: The actual implementation of clonned chunk may be different
313+
*/
314+
virtual IContiguousChunk::TPtr Clone() = 0;
319315
};
320316

321317
class TRope;
@@ -458,7 +454,7 @@ class TRcBuf {
458454
} else if constexpr (std::is_same_v<T, TString>) {
459455
return value.IsDetached();
460456
} else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) {
461-
return value.RefCount() == 1 && value->IsPrivate();
457+
return value->IsPrivate();
462458
} else {
463459
static_assert(TDependentFalse<T>);
464460
}
@@ -500,7 +496,10 @@ class TRcBuf {
500496
}
501497
return {value.mutable_data(), value.size()};
502498
} else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) {
503-
return value->GetDataMut();
499+
if (!value->IsPrivate()) {
500+
value = value->Clone();
501+
}
502+
return value->UnsafeGetDataMut();
504503
} else {
505504
static_assert(TDependentFalse<T>, "unexpected type");
506505
}

ydb/library/actors/util/rc_buf_ut.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,25 @@ Y_UNIT_TEST_SUITE(TRcBuf) {
6363
Y_UNIT_TEST(Detach) {
6464
TRcBuf data = TRcBuf::Copy(TString("test"));
6565
TRcBuf data2 = data;
66+
UNIT_ASSERT_EQUAL(data.GetData(), data2.GetData());
6667
char* res = data2.Detach();
6768
UNIT_ASSERT_UNEQUAL(data.GetData(), data2.GetData());
6869
UNIT_ASSERT_EQUAL(res, data2.GetData());
6970
UNIT_ASSERT_EQUAL(::memcmp(res, "test", 4), 0);
7071
UNIT_ASSERT_EQUAL(::memcmp(data.GetData(), "test", 4), 0);
7172
}
7273

74+
Y_UNIT_TEST(DetachAndDestroySrc) {
75+
TRcBuf data = TRcBuf::Copy(TString("test"));
76+
TRcBuf data2 = data;
77+
data = {};
78+
UNIT_ASSERT_EQUAL(data2.GetSize(), 4);
79+
char* res = data2.Detach();
80+
UNIT_ASSERT_EQUAL(data2.GetSize(), 4);
81+
UNIT_ASSERT_EQUAL(res, data2.GetData());
82+
UNIT_ASSERT_EQUAL(::memcmp(res, "test", 4), 0);
83+
}
84+
7385
Y_UNIT_TEST(Resize) {
7486
TRcBuf data = TRcBuf::Uninitialized(10, 20, 30);
7587
UNIT_ASSERT_EQUAL(data.size(), 10);

ydb/library/actors/util/rope.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class TRopeAlignedBuffer : public IContiguousChunk {
2020
static constexpr size_t Alignment = 16;
2121
static constexpr size_t MallocAlignment = sizeof(size_t);
2222

23-
ui32 Size;
23+
const ui32 Size;
2424
const ui32 Capacity;
2525
const ui32 Offset;
2626
alignas(Alignment) char Data[];
@@ -38,6 +38,13 @@ class TRopeAlignedBuffer : public IContiguousChunk {
3838
return new(malloc(sizeof(TRopeAlignedBuffer) + size + Alignment - MallocAlignment)) TRopeAlignedBuffer(size);
3939
}
4040

41+
IContiguousChunk::TPtr Clone() override {
42+
TIntrusivePtr<TRopeAlignedBuffer> buf = Allocate(Size);
43+
TContiguousSpan src = GetData();
44+
::memcpy(buf->UnsafeGetDataMut().GetData(), src.Data(), src.GetSize());
45+
return buf;
46+
}
47+
4148
void *operator new(size_t) {
4249
Y_ABORT();
4350
}
@@ -59,7 +66,7 @@ class TRopeAlignedBuffer : public IContiguousChunk {
5966
return {Data + Offset, Size};
6067
}
6168

62-
TMutableContiguousSpan GetDataMut() override {
69+
TMutableContiguousSpan UnsafeGetDataMut() override {
6370
return {Data + Offset, Size};
6471
}
6572

ydb/library/actors/util/rope_ut.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,19 @@ class TRopeStringBackend : public IContiguousChunk {
1515
return {Buffer.data(), Buffer.size()};
1616
}
1717

18-
TMutableContiguousSpan GetDataMut() override {
19-
return {Buffer.Detach(), Buffer.size()};
20-
}
21-
2218
TMutableContiguousSpan UnsafeGetDataMut() override {
2319
return {const_cast<char*>(Buffer.data()), Buffer.size()};
2420
}
2521

2622
size_t GetOccupiedMemorySize() const override {
2723
return Buffer.capacity();
2824
}
25+
26+
IContiguousChunk::TPtr Clone() override {
27+
auto ptr = MakeIntrusive<TRopeStringBackend>(Buffer);
28+
ptr->Buffer.Detach();
29+
return ptr;
30+
}
2931
};
3032

3133
TRope CreateRope(TString s, size_t sliceSize) {

0 commit comments

Comments
 (0)