|
45 | 45 | #define EXTRA_POOL_SIZE 10 * 1024 * 1024 |
46 | 46 |
|
47 | 47 | class DirectUringRandomAccessFile; |
| 48 | + |
48 | 49 | // This class is global class. The variable is shared by each thread |
49 | | -class BufferPool { |
| 50 | +class BufferPool |
| 51 | +{ |
50 | 52 | public: |
51 | | - class BufferPoolManagedEntry { |
52 | | - public: |
53 | | - enum class State { |
54 | | - InitizaledNotAllocated, |
55 | | - AllocatedAndInUse, |
56 | | - UselessButNotFree |
| 53 | + class BufferPoolManagedEntry |
| 54 | + { |
| 55 | + public: |
| 56 | + enum class State |
| 57 | + { |
| 58 | + InitizaledNotAllocated, |
| 59 | + AllocatedAndInUse, |
| 60 | + UselessButNotFree |
| 61 | + }; |
| 62 | + |
| 63 | + private: |
| 64 | + std::shared_ptr<BufferPoolEntry> bufferPoolEntry; |
| 65 | + int ring_index; |
| 66 | + size_t current_size; |
| 67 | + int offset; |
| 68 | + State state; |
| 69 | + |
| 70 | + public: |
| 71 | + BufferPoolManagedEntry(std::shared_ptr<BufferPoolEntry> entry, int ringIdx, |
| 72 | + size_t currSize, off_t off) |
| 73 | + : bufferPoolEntry(std::move(entry)), ring_index(ringIdx), |
| 74 | + current_size(currSize), offset(off), |
| 75 | + state(State::InitizaledNotAllocated) |
| 76 | + { |
| 77 | + } |
| 78 | + |
| 79 | + std::shared_ptr<BufferPoolEntry> getBufferPoolEntry() const |
| 80 | + { |
| 81 | + return bufferPoolEntry; |
| 82 | + } |
| 83 | + |
| 84 | + int getRingIndex() const |
| 85 | + { |
| 86 | + return ring_index; |
| 87 | + } |
| 88 | + |
| 89 | + void setRingIndex(int index) |
| 90 | + { |
| 91 | + ring_index = index; |
| 92 | + } |
| 93 | + |
| 94 | + size_t getCurrentSize() const |
| 95 | + { |
| 96 | + return current_size; |
| 97 | + } |
| 98 | + |
| 99 | + void setCurrentSize(size_t size) |
| 100 | + { |
| 101 | + current_size = size; |
| 102 | + } |
| 103 | + |
| 104 | + int getOffset() const |
| 105 | + { |
| 106 | + return offset; |
| 107 | + } |
| 108 | + |
| 109 | + void setOffset(int off) |
| 110 | + { |
| 111 | + offset = off; |
| 112 | + } |
| 113 | + |
| 114 | + State getStatus() const |
| 115 | + { |
| 116 | + return state; |
| 117 | + } |
| 118 | + |
| 119 | + void setStatus(State newStatus) |
| 120 | + { |
| 121 | + state = newStatus; |
| 122 | + } |
57 | 123 | }; |
58 | 124 |
|
59 | | - private: |
60 | | - std::shared_ptr<BufferPoolEntry> bufferPoolEntry; |
61 | | - int ring_index; |
62 | | - size_t current_size; |
63 | | - int offset; |
64 | | - State state; |
65 | | - |
66 | | - public: |
67 | | - BufferPoolManagedEntry(std::shared_ptr<BufferPoolEntry> entry, int ringIdx, |
68 | | - size_t currSize, off_t off) |
69 | | - : bufferPoolEntry(std::move(entry)), ring_index(ringIdx), |
70 | | - current_size(currSize), offset(off), |
71 | | - state(State::InitizaledNotAllocated) {} |
72 | | - |
73 | | - std::shared_ptr<BufferPoolEntry> getBufferPoolEntry() const { |
74 | | - return bufferPoolEntry; |
75 | | - } |
76 | | - |
77 | | - int getRingIndex() const { return ring_index; } |
78 | | - |
79 | | - void setRingIndex(int index) { ring_index = index; } |
80 | | - |
81 | | - size_t getCurrentSize() const { return current_size; } |
82 | | - |
83 | | - void setCurrentSize(size_t size) { current_size = size; } |
| 125 | + static void Initialize(std::vector<uint32_t> colIds, |
| 126 | + std::vector<uint64_t> bytes, |
| 127 | + std::vector<std::string> columnNames); |
84 | 128 |
|
85 | | - int getOffset() const { return offset; } |
| 129 | + static void InitializeBuffers(); |
86 | 130 |
|
87 | | - void setOffset(int off) { offset = off; } |
| 131 | + static std::shared_ptr<ByteBuffer> GetBuffer(uint32_t colId, uint64_t byte, |
| 132 | + std::string columnName); |
88 | 133 |
|
89 | | - State getStatus() const { return state; } |
| 134 | + static int64_t GetBufferId(); |
90 | 135 |
|
91 | | - void setStatus(State newStatus) { state = newStatus; } |
92 | | - }; |
| 136 | + static void Switch(); |
93 | 137 |
|
94 | | - static void Initialize(std::vector<uint32_t> colIds, |
95 | | - std::vector<uint64_t> bytes, |
96 | | - std::vector<std::string> columnNames); |
| 138 | + static void Reset(); |
97 | 139 |
|
98 | | - static void InitializeBuffers(); |
| 140 | + static std::shared_ptr<BufferPoolEntry> AddNewBuffer(size_t size); |
99 | 141 |
|
100 | | - static std::shared_ptr<ByteBuffer> GetBuffer(uint32_t colId, uint64_t byte, |
101 | | - std::string columnName); |
| 142 | + static int getRingIndex(uint32_t colId); |
102 | 143 |
|
103 | | - static int64_t GetBufferId(); |
| 144 | + static std::shared_ptr<ByteBuffer> AllocateNewBuffer( |
| 145 | + std::shared_ptr<BufferPoolManagedEntry> currentBufferManagedEntry, |
| 146 | + uint32_t colId, uint64_t byte, std::string columnName); |
104 | 147 |
|
105 | | - static void Switch(); |
| 148 | + static std::shared_ptr<ByteBuffer> ReusePreviousBuffer( |
| 149 | + std::shared_ptr<BufferPoolManagedEntry> currentBufferManagedEntry, |
| 150 | + uint32_t colId, uint64_t byte, std::string columnName); |
106 | 151 |
|
107 | | - static void Reset(); |
| 152 | + static void PrintStats() |
| 153 | + { |
| 154 | + // Get the ID of the current thread |
| 155 | + std::thread::id tid = std::this_thread::get_id(); |
108 | 156 |
|
109 | | - static std::shared_ptr<BufferPoolEntry> AddNewBuffer(size_t size); |
| 157 | + // Print global buffer usage: used size / free size |
| 158 | + // Convert thread ID to integer for readability using hash |
| 159 | + printf("Thread %zu -> Global buffer usage: %ld / %ld\n", |
| 160 | + std::hash<std::thread::id>{}(tid), global_used_size, |
| 161 | + global_free_size); |
110 | 162 |
|
111 | | - static int getRingIndex(uint32_t colId); |
| 163 | + // Print thread-local statistics for Buffer0 |
| 164 | + printf("Thread %zu -> Buffer0 usage: %zu, Buffer count: %d\n", |
| 165 | + std::hash<std::thread::id>{}(tid), thread_local_used_size[0], |
| 166 | + thread_local_buffer_count[0]); |
112 | 167 |
|
113 | | - static std::shared_ptr<ByteBuffer> AllocateNewBuffer( |
114 | | - std::shared_ptr<BufferPoolManagedEntry> currentBufferManagedEntry, |
115 | | - uint32_t colId, uint64_t byte, std::string columnName); |
116 | | - |
117 | | - static std::shared_ptr<ByteBuffer> ReusePreviousBuffer( |
118 | | - std::shared_ptr<BufferPoolManagedEntry> currentBufferManagedEntry, |
119 | | - uint32_t colId, uint64_t byte, std::string columnName); |
120 | | - |
121 | | - static void PrintStats() { |
122 | | - // Get the ID of the current thread |
123 | | - std::thread::id tid = std::this_thread::get_id(); |
124 | | - |
125 | | - // Print global buffer usage: used size / free size |
126 | | - // Convert thread ID to integer for readability using hash |
127 | | - printf("Thread %zu -> Global buffer usage: %ld / %ld\n", |
128 | | - std::hash<std::thread::id>{}(tid), global_used_size, |
129 | | - global_free_size); |
130 | | - |
131 | | - // Print thread-local statistics for Buffer0 |
132 | | - printf("Thread %zu -> Buffer0 usage: %zu, Buffer count: %d\n", |
133 | | - std::hash<std::thread::id>{}(tid), thread_local_used_size[0], |
134 | | - thread_local_buffer_count[0]); |
135 | | - |
136 | | - // Print thread-local statistics for Buffer1 |
137 | | - printf("Thread %zu -> Buffer1 usage: %zu, Buffer count: %d\n", |
138 | | - std::hash<std::thread::id>{}(tid), thread_local_used_size[1], |
139 | | - thread_local_buffer_count[1]); |
140 | | - } |
| 168 | + // Print thread-local statistics for Buffer1 |
| 169 | + printf("Thread %zu -> Buffer1 usage: %zu, Buffer count: %d\n", |
| 170 | + std::hash<std::thread::id>{}(tid), thread_local_used_size[1], |
| 171 | + thread_local_buffer_count[1]); |
| 172 | + } |
141 | 173 |
|
142 | 174 | private: |
143 | | - BufferPool() = default; |
144 | | - // global |
145 | | - static std::mutex bufferPoolMutex; |
146 | | - |
147 | | - // thread local |
148 | | - static thread_local bool isInitialized; |
149 | | - static thread_local std::vector<std::shared_ptr<BufferPoolEntry>> |
150 | | - registeredBuffers[2]; |
151 | | - static thread_local long global_used_size; |
152 | | - static thread_local long global_free_size; |
153 | | - static thread_local std::shared_ptr<DirectIoLib> directIoLib; |
154 | | - static thread_local int nextRingIndex; |
155 | | - static thread_local std::shared_ptr<BufferPoolEntry> |
156 | | - nextEmptyBufferPoolEntry[2]; |
157 | | - static thread_local int colCount; |
158 | | - |
159 | | - static thread_local int currBufferIdx; |
160 | | - static thread_local int nextBufferIdx; |
161 | | - static thread_local std::map<uint32_t, std::shared_ptr<ByteBuffer>> |
162 | | - buffersAllocated[2]; |
163 | | - friend class DirectUringRandomAccessFile; |
164 | | - |
165 | | - static thread_local std::unordered_map< |
166 | | - uint32_t, std::shared_ptr<BufferPoolManagedEntry>> |
167 | | - ringBufferMap[2]; |
168 | | - |
169 | | - static thread_local size_t thread_local_used_size[2]; |
170 | | - static thread_local int thread_local_buffer_count[2]; |
| 175 | + BufferPool() = default; |
| 176 | + // global |
| 177 | + static std::mutex bufferPoolMutex; |
| 178 | + |
| 179 | + // thread local |
| 180 | + static thread_local bool isInitialized; |
| 181 | + static thread_local std::vector<std::shared_ptr<BufferPoolEntry>> |
| 182 | + registeredBuffers[2]; |
| 183 | + static thread_local long global_used_size; |
| 184 | + static thread_local long global_free_size; |
| 185 | + static thread_local std::shared_ptr<DirectIoLib> directIoLib; |
| 186 | + static thread_local int nextRingIndex; |
| 187 | + static thread_local std::shared_ptr<BufferPoolEntry> |
| 188 | + nextEmptyBufferPoolEntry[2]; |
| 189 | + static thread_local int colCount; |
| 190 | + |
| 191 | + static thread_local int currBufferIdx; |
| 192 | + static thread_local int nextBufferIdx; |
| 193 | + static thread_local std::map<uint32_t, std::shared_ptr<ByteBuffer>> |
| 194 | + buffersAllocated[2]; |
| 195 | + friend class DirectUringRandomAccessFile; |
| 196 | + |
| 197 | + static thread_local std::unordered_map< |
| 198 | + uint32_t, std::shared_ptr<BufferPoolManagedEntry>> |
| 199 | + ringBufferMap[2]; |
| 200 | + |
| 201 | + static thread_local size_t thread_local_used_size[2]; |
| 202 | + static thread_local int thread_local_buffer_count[2]; |
171 | 203 | }; |
172 | 204 | #endif // DUCKDB_BUFFERPOOL_H |
0 commit comments