diff --git a/.gitmodules b/.gitmodules index d85acea564..7f8cb88da4 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,3 +7,6 @@ [submodule "cpp/third-party/googletest"] path = cpp/third-party/googletest url = git@github.com:google/googletest.git +[submodule "cpp/FlameGraph"] + path = cpp/FlameGraph + url = git@github.com:brendangregg/FlameGraph.git diff --git a/cpp/.gitignore b/cpp/.gitignore index 6021de86a8..eedd6d3627 100644 --- a/cpp/.gitignore +++ b/cpp/.gitignore @@ -5,3 +5,8 @@ plot **/cmake-build-debug **/CMakeCache.txt **/CMakeFiles +# remove perf svg +cpp/testcase/perf-*/*.svg/*.svg +*.csv +*.txt +*.svg diff --git a/cpp/FlameGraph b/cpp/FlameGraph new file mode 160000 index 0000000000..41fee1f99f --- /dev/null +++ b/cpp/FlameGraph @@ -0,0 +1 @@ +Subproject commit 41fee1f99f9276008b7cd112fca19dc3ea84ac32 diff --git a/cpp/Makefile b/cpp/Makefile index 418f175a2f..6fe36ee78b 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -57,4 +57,8 @@ debug: deps release: deps cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Release ${BUILD_FLAGS} -S pixels-duckdb/duckdb -B build/release && \ - cmake --build build/release --config Release \ No newline at end of file + cmake --build build/release --config Release + +relWithDebInfo: deps + cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=RelWithDebInfo ${BUILD_FLAGS} -S pixels-duckdb/duckdb -B build/relWithDebInfo && \ + cmake --build build/relWithDebInfo --config RelWithDebInfo \ No newline at end of file diff --git a/cpp/benchmark.json b/cpp/benchmark.json new file mode 100644 index 0000000000..03dd9ec9a6 --- /dev/null +++ b/cpp/benchmark.json @@ -0,0 +1,11 @@ +{ + "tpch-pixels-e0":"", + "tpch-pixels-e1":"", + "tpch-pixels-e2":"", + "tpch-parquet-e0":"", + "tpch-parquet-e2":"", + "clickbench-parquet-e2":"", + "clickbench-parquet-e0":"CREATE VIEW hits AS SELECT * FROM parquet_scan([\n \"/data/9a3-01/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-02/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-03/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-04/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-05/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-06/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-07/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-08/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-09/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-10/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-11/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-12/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-13/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-14/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-15/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-16/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-17/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-18/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-19/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-20/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-21/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-22/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-23/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-24/clickbench/parquet-e0/hits/*\"\n ]\n);", + "clickbench-pixels-e0":"CREATE VIEW hits AS SELECT * FROM pixels_scan([\n \"/data/9a3-01/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-02/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-03/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-04/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-05/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-06/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-07/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-08/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-09/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-10/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-11/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-12/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-13/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-14/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-15/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-16/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-17/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-18/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-19/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-20/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-21/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-22/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-23/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-24/clickbench/pixels-e0/hits/v-0-ordered/*\"\n ]\n);", + "clickbench-pixels-e1":"" +} \ No newline at end of file diff --git a/cpp/include/PixelsReadBindData.hpp b/cpp/include/PixelsReadBindData.hpp index 06de4b7969..14254c625f 100644 --- a/cpp/include/PixelsReadBindData.hpp +++ b/cpp/include/PixelsReadBindData.hpp @@ -42,7 +42,7 @@ namespace duckdb std::shared_ptr initialPixelsReader; std::shared_ptr fileSchema; vector files; - atomic curFileId; + atomic curFileId; }; } diff --git a/cpp/include/PixelsReadGlobalState.hpp b/cpp/include/PixelsReadGlobalState.hpp index 0d23e1f855..9747b2bddc 100644 --- a/cpp/include/PixelsReadGlobalState.hpp +++ b/cpp/include/PixelsReadGlobalState.hpp @@ -40,6 +40,9 @@ namespace duckdb { mutex lock; + atomic active_threads; // Number of active threads + atomic all_done; // Whether all threads have completed + //! The initial reader from the bind phase std::shared_ptr initialPixelsReader; diff --git a/cpp/include/PixelsScanFunction.hpp b/cpp/include/PixelsScanFunction.hpp index 297500a068..fb29fc6a1e 100644 --- a/cpp/include/PixelsScanFunction.hpp +++ b/cpp/include/PixelsScanFunction.hpp @@ -113,7 +113,7 @@ namespace duckdb PixelsScanInitLocal(ExecutionContext &context, TableFunctionInitInput &input, GlobalTableFunctionState *gstate_p); - static bool PixelsParallelStateNext(ClientContext &context, const PixelsReadBindData &bind_data, + static bool PixelsParallelStateNext(ClientContext &context, PixelsReadBindData &bind_data, PixelsReadLocalState &scan_data, PixelsReadGlobalState ¶llel_state, bool is_init_state = false); diff --git a/cpp/perf.sh b/cpp/perf.sh new file mode 100755 index 0000000000..ba1eea671f --- /dev/null +++ b/cpp/perf.sh @@ -0,0 +1,74 @@ +#!/bin/bash + +# Check number of arguments +if [ $# -ne 2 ]; then + echo "Usage: $0 " + echo "Example: $0 test_q01.sql query_1" + exit 1 +fi + +# Assign parameters +QUERY_FILE="$1" +OUTPUT_PREFIX="$2" + +# Record start time (nanoseconds) +START_TIME=$(date +%s%N) +PREVIOUS_TIME=$START_TIME + +# Function to display time duration +show_time() { + local start=$1 + local end=$2 + local stage=$3 + + # Calculate duration in milliseconds (integer arithmetic in bash) + local duration=$(( (end - start) / 1000000 )) + echo "Stage '$stage' duration: ${duration}ms" +} + +echo "Starting performance analysis..." + +# Run perf recording +echo "1. Running perf record..." +sudo -E perf record -F 1 --call-graph=dwarf -g ./build/release/duckdb < "$QUERY_FILE" +CURRENT_TIME=$(date +%s%N) +show_time $PREVIOUS_TIME $CURRENT_TIME "Running perf record" +PREVIOUS_TIME=$CURRENT_TIME + +# Generate perf script output +echo "2. Generating perf script output..." +sudo perf script -i perf.data > "${OUTPUT_PREFIX}.perf" +CURRENT_TIME=$(date +%s%N) +show_time $PREVIOUS_TIME $CURRENT_TIME "Generating perf script output" +PREVIOUS_TIME=$CURRENT_TIME + +# Collapse call stacks +echo "3. Collapsing call stacks..." +stackcollapse-perf.pl "${OUTPUT_PREFIX}.perf" > "${OUTPUT_PREFIX}.folded" +CURRENT_TIME=$(date +%s%N) +show_time $PREVIOUS_TIME $CURRENT_TIME "Collapsing call stacks" +PREVIOUS_TIME=$CURRENT_TIME + +# Generate flame graph +echo "4. Generating flame graph..." +flamegraph.pl "${OUTPUT_PREFIX}.folded" > "${OUTPUT_PREFIX}-cpu.svg" +CURRENT_TIME=$(date +%s%N) +show_time $PREVIOUS_TIME $CURRENT_TIME "Generating flame graph" +PREVIOUS_TIME=$CURRENT_TIME + +# Rename perf data file +echo "5. Renaming perf data file..." +mv perf.data "${OUTPUT_PREFIX}-perf.data" +CURRENT_TIME=$(date +%s%N) +show_time $PREVIOUS_TIME $CURRENT_TIME "Renaming files" +PREVIOUS_TIME=$CURRENT_TIME + +# Calculate total duration +TOTAL_DURATION=$(( (CURRENT_TIME - START_TIME) / 1000000 )) +echo "Total duration: ${TOTAL_DURATION}ms" + +echo "Operation completed. Generated files:" +echo "${OUTPUT_PREFIX}.perf" +echo "${OUTPUT_PREFIX}.folded" +echo "${OUTPUT_PREFIX}-cpu.svg" +echo "${OUTPUT_PREFIX}-perf.data" \ No newline at end of file diff --git a/cpp/pixels-common/CMakeLists.txt b/cpp/pixels-common/CMakeLists.txt index 1ed1f4f03d..3cba67bf36 100644 --- a/cpp/pixels-common/CMakeLists.txt +++ b/cpp/pixels-common/CMakeLists.txt @@ -4,50 +4,21 @@ set(CMAKE_CXX_STANDARD 17) include(ExternalProject) -set(pixels_common_cxx - lib/physical/storage/LocalFS.cpp - lib/physical/storage/LocalFSProvider.cpp - lib/physical/storage/PhysicalLocalWriter.cpp - lib/physical/PhysicalWriterOption.cpp - lib/physical/Status.cpp - lib/physical/Storage.cpp - lib/physical/FilePath.cpp - lib/physical/natives/PixelsRandomAccessFile.cpp - lib/physical/natives/DirectRandomAccessFile.cpp - lib/physical/natives/ByteBuffer.cpp - lib/physical/io/PhysicalLocalReader.cpp - lib/physical/StorageFactory.cpp - lib/physical/Request.cpp - lib/physical/RequestBatch.cpp - lib/physical/scheduler/NoopScheduler.cpp - lib/physical/SchedulerFactory.cpp - lib/exception/InvalidArgumentException.cpp - lib/utils/Constants.cpp - lib/utils/String.cpp - include/physical/natives/DirectIoLib.h - lib/physical/natives/DirectIoLib.cpp - include/utils/ConfigFactory.h - lib/utils/ConfigFactory.cpp - include/physical/MergedRequest.h - include/physical/scheduler/SortMergeScheduler.h - lib/physical/scheduler/SortMergeScheduler.cpp - lib/MergedRequest.cpp include/profiler/TimeProfiler.h - lib/profiler/TimeProfiler.cpp - include/profiler/CountProfiler.h - lib/profiler/CountProfiler.cpp - include/profiler/AbstractProfiler.h - include/physical/allocator/Allocator.h - include/physical/allocator/OrdinaryAllocator.h - lib/physical/allocator/OrdinaryAllocator.cpp - include/physical/allocator/BufferPoolAllocator.h - lib/physical/allocator/BufferPoolAllocator.cpp - include/physical/BufferPool.h - lib/physical/BufferPool.cpp - include/physical/natives/DirectUringRandomAccessFile.h - lib/physical/natives/DirectUringRandomAccessFile.cpp - include/utils/ColumnSizeCSVReader.h lib/utils/ColumnSizeCSVReader.cpp - include/physical/StorageArrayScheduler.h lib/physical/StorageArrayScheduler.cpp - include/physical/natives/ByteOrder.h + +file(GLOB_RECURSE pixels_common_cxx + "lib/physical/*.cpp" + "lib/physical/*.h" + "lib/exception/*.cpp" + "lib/exception/*.h" + "lib/utils/*.cpp" + "lib/utils/*.h" + "lib/profiler/*.cpp" + "lib/profiler/*.h" + "include/physical/*.h" + "include/profiler/*.h" + "include/utils/*.h" + "include/physical/BufferPool/*.h" + "lib/MergedRequest.cpp" ) include_directories(include) diff --git a/cpp/pixels-common/include/physical/BufferPool.h b/cpp/pixels-common/include/physical/BufferPool.h index ec810e29a9..9698f6a03c 100644 --- a/cpp/pixels-common/include/physical/BufferPool.h +++ b/cpp/pixels-common/include/physical/BufferPool.h @@ -25,49 +25,179 @@ #ifndef DUCKDB_BUFFERPOOL_H #define DUCKDB_BUFFERPOOL_H -#include -#include +#include "exception/InvalidArgumentException.h" +#include "physical/BufferPool/Bitmap.h" +#include "physical/BufferPool/BufferPoolEntry.h" #include "physical/natives/ByteBuffer.h" -#include #include "physical/natives/DirectIoLib.h" -#include "exception/InvalidArgumentException.h" #include "utils/ColumnSizeCSVReader.h" +#include +#include #include - -// when allocating buffer pool, we use the size of the first pxl file. Consider that -// the remaining pxl file has larger size than the first file, we allocate some extra -// size (10MB) to each column. +#include +#include +#include +#include +// when allocating buffer pool, we use the size of the first pxl file. Consider +// that the remaining pxl file has larger size than the first file, we allocate +// some extra size (10MB) to each column. // TODO: how to evaluate the maximal pool size -#define EXTRA_POOL_SIZE 3*1024*1024 +#define EXTRA_POOL_SIZE 10 * 1024 * 1024 class DirectUringRandomAccessFile; + // This class is global class. The variable is shared by each thread class BufferPool { public: - static void - Initialize(std::vector colIds, std::vector bytes, std::vector columnNames); - static std::shared_ptr GetBuffer(uint32_t colId); + class BufferPoolManagedEntry + { + public: + enum class State + { + InitizaledNotAllocated, + AllocatedAndInUse, + UselessButNotFree + }; + + private: + std::shared_ptr bufferPoolEntry; + int ring_index; + size_t current_size; + int offset; + State state; + + public: + BufferPoolManagedEntry(std::shared_ptr entry, int ringIdx, + size_t currSize, off_t off) + : bufferPoolEntry(std::move(entry)), ring_index(ringIdx), + current_size(currSize), offset(off), + state(State::InitizaledNotAllocated) + { + } + + std::shared_ptr getBufferPoolEntry() const + { + return bufferPoolEntry; + } + + int getRingIndex() const + { + return ring_index; + } + + void setRingIndex(int index) + { + ring_index = index; + } + + size_t getCurrentSize() const + { + return current_size; + } + + void setCurrentSize(size_t size) + { + current_size = size; + } + + int getOffset() const + { + return offset; + } - static int64_t GetBufferId(uint32_t index); + void setOffset(int off) + { + offset = off; + } + + State getStatus() const + { + return state; + } + + void setStatus(State newStatus) + { + state = newStatus; + } + }; + + static void Initialize(std::vector colIds, + std::vector bytes, + std::vector columnNames); + + static void InitializeBuffers(); + + static std::shared_ptr GetBuffer(uint32_t colId, uint64_t byte, + std::string columnName); + + static int64_t GetBufferId(); static void Switch(); static void Reset(); + static std::shared_ptr AddNewBuffer(size_t size); + + static int getRingIndex(uint32_t colId); + + static std::shared_ptr AllocateNewBuffer( + std::shared_ptr currentBufferManagedEntry, + uint32_t colId, uint64_t byte, std::string columnName); + + static std::shared_ptr ReusePreviousBuffer( + std::shared_ptr currentBufferManagedEntry, + uint32_t colId, uint64_t byte, std::string columnName); + + static void PrintStats() + { + // Get the ID of the current thread + std::thread::id tid = std::this_thread::get_id(); + + // Print global buffer usage: used size / free size + // Convert thread ID to integer for readability using hash + printf("Thread %zu -> Global buffer usage: %ld / %ld\n", + std::hash{}(tid), global_used_size, + global_free_size); + + // Print thread-local statistics for Buffer0 + printf("Thread %zu -> Buffer0 usage: %zu, Buffer count: %d\n", + std::hash{}(tid), thread_local_used_size[0], + thread_local_buffer_count[0]); + + // Print thread-local statistics for Buffer1 + printf("Thread %zu -> Buffer1 usage: %zu, Buffer count: %d\n", + std::hash{}(tid), thread_local_used_size[1], + thread_local_buffer_count[1]); + } private: BufferPool() = default; + // global + static std::mutex bufferPoolMutex; - static thread_local int colCount; - static thread_local std::map - nrBytes; + // thread local static thread_local bool isInitialized; - static thread_local std::map> - buffers[2]; - static std::shared_ptr directIoLib; + static thread_local std::vector> + registeredBuffers[2]; + static thread_local long global_used_size; + static thread_local long global_free_size; + static thread_local std::shared_ptr directIoLib; + static thread_local int nextRingIndex; + static thread_local std::shared_ptr + nextEmptyBufferPoolEntry[2]; + static thread_local int colCount; static thread_local int currBufferIdx; static thread_local int nextBufferIdx; + static thread_local std::map> + buffersAllocated[2]; friend class DirectUringRandomAccessFile; + + static thread_local std::unordered_map< + uint32_t, std::shared_ptr> + ringBufferMap[2]; + + static thread_local size_t thread_local_used_size[2]; + static thread_local int thread_local_buffer_count[2]; }; #endif // DUCKDB_BUFFERPOOL_H diff --git a/cpp/pixels-common/include/physical/BufferPool/Bitmap.h b/cpp/pixels-common/include/physical/BufferPool/Bitmap.h new file mode 100644 index 0000000000..ac67c547bc --- /dev/null +++ b/cpp/pixels-common/include/physical/BufferPool/Bitmap.h @@ -0,0 +1,82 @@ +/* +* Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + +/* + * @author whz + * @create 2025-07-30 + */ + +#ifndef BITMAP_H +#define BITMAP_H +#include "exception/InvalidArgumentException.h" +#include +#include +#include + +class Bitmap +{ +public: + explicit Bitmap(size_t size) : bits((size + 7) / 8, 0), num_bits(size) + { + } + + void set(size_t index) + { + if (index >= num_bits) + throw InvalidArgumentException("Bitmap::set: index out of range"); + bits[index / 8] |= (1 << (index % 8)); + } + + void clear(size_t index) + { + if (index >= num_bits) + throw InvalidArgumentException("Bitmap::clear: index out of range"); + bits[index / 8] &= ~(1 << (index % 8)); + } + + bool test(size_t index) const + { + if (index >= num_bits) + throw InvalidArgumentException("Bitmap::test: index out of range"); + return bits[index / 8] & (1 << (index % 8)); + } + + size_t size() const + { + return num_bits; + } + + void print() const + { + for (size_t i = 0; i < num_bits; ++i) + { + std::cout << (test(i) ? '1' : '0'); + if ((i + 1) % 8 == 0) + std::cout << ' '; + } + std::cout << '\n'; + } + +private: + std::vector bits; + size_t num_bits; +}; + +#endif // BITMAP_H diff --git a/cpp/pixels-common/include/physical/BufferPool/BufferPoolEntry.h b/cpp/pixels-common/include/physical/BufferPool/BufferPoolEntry.h new file mode 100644 index 0000000000..5fde7f2fe9 --- /dev/null +++ b/cpp/pixels-common/include/physical/BufferPool/BufferPoolEntry.h @@ -0,0 +1,71 @@ +/* +* Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + +/* + * @author whz + * @create 2025-07-30 + */ + +#ifndef BUFFERPOOLENTRY_H +#define BUFFERPOOLENTRY_H +#include "physical/BufferPool/Bitmap.h" +#include +#include +#include + +class BufferPoolEntry +{ +public: + explicit BufferPoolEntry(size_t size, int slice_size, + std::shared_ptr direct_lib, int offset, + int ring_index); + size_t getSize() const; + std::shared_ptr getBitmap() const; + std::shared_ptr getBuffer() const; + bool isFull() const; + int getNextFreeIndex() const; + int setNextFreeIndex(int index); + ~BufferPoolEntry(); + uint64_t checkCol(uint32_t) const; + void addCol(uint32_t colId, uint64_t bytes); + bool isInUse() const; + void setInUse(bool in_use); + int getOffsetInBuffers() const; + void setOffsetInBuffers(int offset); + bool getIsRegistered() const; + void setIsRegistered(bool registered); + int getRingIndex() const; + void setRingIndex(int ring_index); + void reset(); + +private: + size_t size_; + std::shared_ptr bitmap_; + std::shared_ptr buffer_; + bool is_full_; + int next_free_; + std::map nr_bytes_; + bool is_in_use_; + int offset_in_buffers_; + bool is_registered; + int ring_index; +}; + +#endif // BUFFERPOOLENTRY_H diff --git a/cpp/pixels-common/include/physical/Request.h b/cpp/pixels-common/include/physical/Request.h index eb8f909214..08bfaedb46 100644 --- a/cpp/pixels-common/include/physical/Request.h +++ b/cpp/pixels-common/include/physical/Request.h @@ -34,6 +34,7 @@ class Request uint64_t queryId; uint64_t start; uint64_t length; + int ring_index; Request(uint64_t queryId_, uint64_t start_, uint64_t length_, int64_t bufferId = -1); diff --git a/cpp/pixels-common/include/physical/RequestBatch.h b/cpp/pixels-common/include/physical/RequestBatch.h index 423fb74ff6..39d23b7e12 100644 --- a/cpp/pixels-common/include/physical/RequestBatch.h +++ b/cpp/pixels-common/include/physical/RequestBatch.h @@ -42,6 +42,8 @@ class RequestBatch void add(Request request); + Request& getRequest(int index); + int getSize(); std::vector getRequests(); diff --git a/cpp/pixels-common/include/physical/Scheduler.h b/cpp/pixels-common/include/physical/Scheduler.h index f07add0ed5..b6d3af6abe 100644 --- a/cpp/pixels-common/include/physical/Scheduler.h +++ b/cpp/pixels-common/include/physical/Scheduler.h @@ -28,6 +28,7 @@ #include "physical/PhysicalReader.h" #include "physical/RequestBatch.h" #include "profiler/TimeProfiler.h" +#include class Scheduler { diff --git a/cpp/pixels-common/include/physical/io/PhysicalLocalReader.h b/cpp/pixels-common/include/physical/io/PhysicalLocalReader.h index 9ebe51a622..23a3e42f9a 100644 --- a/cpp/pixels-common/include/physical/io/PhysicalLocalReader.h +++ b/cpp/pixels-common/include/physical/io/PhysicalLocalReader.h @@ -26,52 +26,63 @@ #define PIXELS_READER_PHYSICALLOCALREADER_H #include "physical/PhysicalReader.h" -#include "physical/storage/LocalFS.h" #include "physical/natives/DirectRandomAccessFile.h" #include "physical/natives/DirectUringRandomAccessFile.h" -#include +#include "physical/storage/LocalFS.h" #include +#include +#include - -class PhysicalLocalReader : public PhysicalReader -{ +class PhysicalLocalReader : public PhysicalReader { public: - PhysicalLocalReader(std::shared_ptr storage, std::string path); + PhysicalLocalReader(std::shared_ptr storage, std::string path); - std::shared_ptr readFully(int length) override; + std::shared_ptr readFully(int length) override; - std::shared_ptr readFully(int length, std::shared_ptr bb) override; + std::shared_ptr readFully(int length, std::shared_ptr bb) override; - std::shared_ptr readAsync(int length, std::shared_ptr bb, int index); + std::shared_ptr readAsync(int length, std::shared_ptr bb, int index,int ring_index,int start_offset); - void readAsyncSubmit(uint32_t size); + void readAsyncSubmit(std::unordered_map sizes,std::unordered_set ring_index); - void readAsyncComplete(uint32_t size); + void readAsyncComplete(std::unordered_map sizes,std::unordered_set ring_index); - void readAsyncSubmitAndComplete(uint32_t size); + void readAsyncSubmitAndComplete(uint32_t size,std::unordered_set ring_index); - void close() override; + void close() override; - long getFileLength() override; + long getFileLength() override; - void seek(long desired) override; + void seek(long desired) override; - long readLong() override; + long readLong() override; - int readInt() override; + int readInt() override; - char readChar() override; + char readChar() override; - std::string getName() override; + std::string getName() override; -private: - std::shared_ptr local; - std::string path; - long id; - std::atomic numRequests; - std::atomic asyncNumRequests; - std::shared_ptr raf; + void addRingIndex(int ring_index); + + std::unordered_set& getRingIndexes(); + + std::unordered_map getRingIndexCountMap(); + void setRingIndexCountMap(std::unordered_map ringIndexCount); + +private: + std::shared_ptr local; + std::string path; + long id; + std::atomic numRequests; + std::atomic asyncNumRequests; + std::shared_ptr raf; + // usr for bufferpool + bool isBufferValid; + std::unordered_set> ringIndexes; + std::unordered_set ring_index_vector; + std::unordered_map ringIndexCountMap; }; #endif //PIXELS_READER_PHYSICALLOCALREADER_H diff --git a/cpp/pixels-common/include/physical/natives/ByteBuffer.h b/cpp/pixels-common/include/physical/natives/ByteBuffer.h index ab64114641..1ebd430a9c 100644 --- a/cpp/pixels-common/include/physical/natives/ByteBuffer.h +++ b/cpp/pixels-common/include/physical/natives/ByteBuffer.h @@ -43,8 +43,12 @@ class ByteBuffer ByteBuffer(ByteBuffer &bb, uint32_t startId, uint32_t length); + ByteBuffer(ByteBuffer &bb,uint32_t startId,uint32_t length,bool from_slice); + ~ByteBuffer(); + std::shared_ptr slice(uint32_t offset, uint32_t length); + void filp();// reset the readPosition uint32_t bytesRemaining(); // Number of uint8_ts from the current read position till the end of the buffer void clear(); // Clear our the vector and reset read and write positions @@ -167,6 +171,7 @@ class ByteBuffer std::string name; uint32_t rmark; bool fromOtherBB; + bool fromSlice; // Sometimes the buffer is allocated by malloc/poxis_memalign, in this case, we // should use free() to deallocate the buf bool allocated_by_new; diff --git a/cpp/pixels-common/include/physical/natives/DirectIoLib.h b/cpp/pixels-common/include/physical/natives/DirectIoLib.h index 2fec848b13..1b1f4aeee6 100644 --- a/cpp/pixels-common/include/physical/natives/DirectIoLib.h +++ b/cpp/pixels-common/include/physical/natives/DirectIoLib.h @@ -27,30 +27,33 @@ /** * Mapping Linux I/O functions to native methods. - * Partially referenced the implementation of Jaydio (https://github.com/smacke/jaydio), - * which is implemented by Stephen Macke and licensed under Apache 2.0. - *

+ * Partially referenced the implementation of Jaydio(https://github.com/smacke/jaydio), + * which is implemented by Stephen Macke and + * licensed under Apache 2.0.

* Created at: 02/02/2023 * Author: Liangyong Yu */ -#include "utils/ConfigFactory.h" +#include "liburing.h" +#include "liburing/io_uring.h" #include "physical/natives/ByteBuffer.h" +#include "utils/ConfigFactory.h" +#include +#include #include +#include +#include +#include #include -#include +#include #include -#include "liburing.h" -#include "liburing/io_uring.h" - struct uringData { int idx; - ByteBuffer *bb; + ByteBuffer* bb; }; - class DirectIoLib { public: @@ -59,14 +62,24 @@ class DirectIoLib */ DirectIoLib(int fsBlockSize); - std::shared_ptr allocateDirectBuffer(long size); + std::shared_ptr allocateDirectBuffer(long size, + bool isSmallBuffer); + + int getToAllocate(int size); - std::shared_ptr read(int fd, long fileOffset, std::shared_ptr directBuffer, long length); + std::shared_ptr read(int fd, long fileOffset, + std::shared_ptr directBuffer, + long length); long blockStart(long value); long blockEnd(long value); + int getBlockSize() const + { + return fsBlockSize; + }; + private: int fsBlockSize; long fsBlockNotMask; diff --git a/cpp/pixels-common/include/physical/natives/DirectUringRandomAccessFile.h b/cpp/pixels-common/include/physical/natives/DirectUringRandomAccessFile.h index ab5293aa27..977ac2cd2a 100644 --- a/cpp/pixels-common/include/physical/natives/DirectUringRandomAccessFile.h +++ b/cpp/pixels-common/include/physical/natives/DirectUringRandomAccessFile.h @@ -31,33 +31,47 @@ #include "exception/InvalidArgumentException.h" #include "DirectIoLib.h" #include "physical/BufferPool.h" +#include "unordered_set" +#include +#include "utils/MutexTracker.h" class DirectUringRandomAccessFile : public DirectRandomAccessFile { public: - explicit DirectUringRandomAccessFile(const std::string &file); + explicit DirectUringRandomAccessFile(const std::string& file); - static void RegisterBuffer(std::vector > buffers); + static void RegisterBuffer(std::vector> buffers); - static void RegisterBufferFromPool(std::vector colIds); + static void RegisterBufferFromPool(std::vector colIds); static void Initialize(); static void Reset(); - std::shared_ptr readAsync(int length, std::shared_ptr buffer, int index); + static bool RegisterMoreBuffer(int index, std::vector> buffers); - void readAsyncSubmit(int size); + std::shared_ptr readAsync(int length, std::shared_ptr buffer, int index, int ring_index, + int start_offset); - void readAsyncComplete(int size); + void readAsyncSubmit(std::unordered_map sizes, std::unordered_set ring_indexs); + + void readAsyncComplete(std::unordered_map sizes, std::unordered_set ring_indexs); + + void seekByIndex(long offset, int index); + + static struct io_uring* getRing(int index); ~DirectUringRandomAccessFile(); private: - static thread_local struct io_uring *ring; + // thread_local + static std::mutex mutex_; static thread_local bool isRegistered; - static thread_local struct iovec *iovecs; - static thread_local uint32_t - iovecSize; + // static MutexTracker g_mutex_tracker; + // static TrackedMutex g_mutex; + static thread_local std::vector ring_vector; + static thread_local std::vector iovecs_vector; + static thread_local uint32_t iovecSize; + static thread_local std::vector offsets_vector; }; #endif // DUCKDB_DIRECTURINGRANDOMACCESSFILE_H diff --git a/cpp/pixels-common/include/utils/MutexTracker.h b/cpp/pixels-common/include/utils/MutexTracker.h new file mode 100644 index 0000000000..e25ceb3a7c --- /dev/null +++ b/cpp/pixels-common/include/utils/MutexTracker.h @@ -0,0 +1,149 @@ +/* +* Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + +/* + * @author whz + * @create 2025-08-23 + */ + +#ifndef DUCKDB_MUTEX_H +#define DUCKDB_MUTEX_H +#include +#include +#include +#include +#include +#include +class MutexTracker { +private: + // Stores the mapping between mutexes and their owning thread IDs + std::unordered_map mutex_owners; + // Internal mutex to protect the mapping table + std::mutex internal_mutex; + +public: + // Gets the thread ID of the mutex owner; returns default ID if not held + std::thread::id get_owner(const std::mutex* mutex) { + std::lock_guard lock(internal_mutex); + auto it = mutex_owners.find(mutex); + if (it != mutex_owners.end()) { + return it->second; + } + return std::thread::id(); // Not held by any thread + } + + // Records that a mutex has been acquired + void lock_acquired(const std::mutex* mutex) { + std::lock_guard lock(internal_mutex); + auto current_thread = std::this_thread::get_id(); + + // Check for duplicate acquisition (to prevent recursive lock issues) + auto it = mutex_owners.find(mutex); + if (it != mutex_owners.end()) { + if (it->second == current_thread) { + // Same thread acquiring a non-recursive mutex repeatedly, may cause deadlock + throw std::runtime_error("Same thread acquiring non-recursive mutex repeatedly"); + } + } + + mutex_owners[mutex] = current_thread; + } + + // Records that a mutex has been released + void lock_released(const std::mutex* mutex) { + std::lock_guard lock(internal_mutex); + mutex_owners.erase(mutex); + } + + // Prints information about the mutex owner + void print_owner(const std::mutex* mutex, const std::string& mutex_name) { + std::thread::id owner = get_owner(mutex); + if (owner == std::thread::id()) { + std::cout << mutex_name << " is not held by any thread" << std::endl; + } else { + std::cout << mutex_name << " is held by thread " << std::hash{}(owner) + << std::endl; + } + } +}; + + +// Mutex wrapper with tracking functionality +class TrackedMutex { +private: + std::mutex internal_mutex; + std::string name; + MutexTracker g_mutex_tracker; + +public: + TrackedMutex(const std::string& name, MutexTracker& g_mutex_tracker) : name(name) {} + + // Locks the mutex and records the owner + void lock() { + internal_mutex.lock(); + g_mutex_tracker.lock_acquired(&internal_mutex); + } + + // Unlocks the mutex and clears the owner record + void unlock() { + g_mutex_tracker.lock_released(&internal_mutex); + internal_mutex.unlock(); + } + + // Tries to lock the mutex (records owner if successful) + bool try_lock() { + if (internal_mutex.try_lock()) { + g_mutex_tracker.lock_acquired(&internal_mutex); + return true; + } + return false; + } + + // Gets the mutex name + const std::string& get_name() const { return name; } + + // Gets pointer to internal mutex (for tracking purposes) + const std::mutex* get_internal_mutex() const { + return &internal_mutex; + } +}; + +// RAII lock manager for use with TrackedMutex +template +class TrackedLockGuard { +private: + Mutex& mutex; + +public: + TrackedLockGuard(Mutex& m) : mutex(m) { + mutex.lock(); + } + + ~TrackedLockGuard() { + mutex.unlock(); + } + + // Disable copying + TrackedLockGuard(const TrackedLockGuard&) = delete; + TrackedLockGuard& operator=(const TrackedLockGuard&) = delete; +}; + + +#endif // DUCKDB_MUTEX_H diff --git a/cpp/pixels-common/lib/physical/BufferPool.cpp b/cpp/pixels-common/lib/physical/BufferPool.cpp index 4dcc7bdeab..3beec05e75 100644 --- a/cpp/pixels-common/lib/physical/BufferPool.cpp +++ b/cpp/pixels-common/lib/physical/BufferPool.cpp @@ -23,30 +23,71 @@ * @create 2023-05-25 */ #include "physical/BufferPool.h" +#include +#include +#include +#include +#include +using TimePoint = std::chrono::high_resolution_clock::time_point; +using Duration = std::chrono::duration; + +TimePoint getCurrentTime() +{ + return std::chrono::high_resolution_clock::now(); +} + +// global +std::string columnSizePath = + ConfigFactory::Instance().getProperty("pixel.column.size.path"); +std::shared_ptr csvReader; +int sliceSize = std::stoi( + ConfigFactory::Instance().getProperty("pixel.bufferpool.sliceSize")); +int bufferSize = std::stoi( + ConfigFactory::Instance().getProperty("pixel.bufferpool.bufferpoolSize")); +const size_t SLICE_SIZE = sliceSize * 1024; +int bufferNum = std::stoi( + ConfigFactory::Instance().getProperty("pixel.bufferpool.bufferNum")); +std::mutex BufferPool::bufferPoolMutex; + +// thread_local +thread_local bool BufferPool::isInitialized; +thread_local std::vector> +BufferPool::registeredBuffers[2]; +thread_local long BufferPool::global_free_size = 0; +thread_local long BufferPool::global_used_size = 0; +thread_local std::shared_ptr BufferPool::directIoLib; +thread_local int BufferPool::nextRingIndex = 1; +thread_local std::shared_ptr +BufferPool::nextEmptyBufferPoolEntry[2] = {nullptr, nullptr}; +thread_local std::vector globalBuffers; thread_local int BufferPool::colCount = 0; -thread_local std::map -BufferPool::nrBytes; -thread_local bool BufferPool::isInitialized = false; -thread_local std::map> -BufferPool::buffers[2]; // The currBufferIdx is set to 1. When executing the first file, this value is 0 // since we call switch function first. thread_local int BufferPool::currBufferIdx = 1; thread_local int BufferPool::nextBufferIdx = 0; -std::shared_ptr BufferPool::directIoLib; -void BufferPool::Initialize(std::vector colIds, std::vector bytes, - std::vector columnNames) +thread_local std::map> +BufferPool::buffersAllocated[2]; +thread_local std::unordered_map< + uint32_t, std::shared_ptr> +BufferPool::ringBufferMap[2]; +thread_local size_t BufferPool::thread_local_used_size[2] = {0, 0}; +thread_local int BufferPool::thread_local_buffer_count[2] = {0, 0}; + +void BufferPool::Initialize(std::vector colIds, + std::vector bytes, + std::vector columnNames) { assert(colIds.size() == bytes.size()); int fsBlockSize = std::stoi(ConfigFactory::Instance().getProperty("localfs.block.size")); - std::string columnSizePath = ConfigFactory::Instance().getProperty("pixel.column.size.path"); - std::shared_ptr csvReader; - if (!columnSizePath.empty()) - { - csvReader = std::make_shared(columnSizePath); - } + auto strToBool = [](const std::string& s) { + + return s == "true" || s == "1" || s == "yes"; + }; + std::string configValue = + ConfigFactory::Instance().getProperty("pixel.bufferpool.fixedsize"); + bool isFixedSize = strToBool(configValue); // give the maximal column size, which is stored in csv reader if (!BufferPool::isInitialized) @@ -54,25 +95,21 @@ void BufferPool::Initialize(std::vector colIds, std::vector (fsBlockSize); + BufferPool::InitializeBuffers(); for (int i = 0; i < colIds.size(); i++) { uint32_t colId = colIds.at(i); - std::string columnName = columnNames[colId]; - for (int idx = 0; idx < 2; idx++) + if (isFixedSize) { - std::shared_ptr buffer; - if (columnSizePath.empty()) - { - buffer = BufferPool::directIoLib->allocateDirectBuffer(bytes.at(i) + EXTRA_POOL_SIZE); - } - else + if (!columnSizePath.empty()) { - buffer = BufferPool::directIoLib->allocateDirectBuffer(csvReader->get(columnName)); + csvReader = std::make_shared(columnSizePath); } - - BufferPool::nrBytes[colId] = buffer->size(); - BufferPool::buffers[idx][colId] = buffer; } + auto byte = bytes.at(i); + BufferPool::ringBufferMap[currBufferIdx][colId] = + std::make_shared( + registeredBuffers[currBufferIdx][0], 0, byte, 0); } BufferPool::colCount = colIds.size(); BufferPool::isInitialized = true; @@ -80,43 +117,198 @@ void BufferPool::Initialize(std::vector colIds, std::vector ( + registeredBuffers[currBufferIdx][0], 0, byte, 0); } } } -int64_t BufferPool::GetBufferId(uint32_t index) +void BufferPool::InitializeBuffers() { - return index + currBufferIdx * colCount; + for (int idx = 0; idx < 2; idx++) + { + const int size_ = bufferSize + EXTRA_POOL_SIZE; + // Calculate the required space, allocate it in advance, and then register + // it. + std::shared_ptr buffer_pool_entry = + std::make_shared(size_, sliceSize, directIoLib, idx, + 0); + registeredBuffers[idx].emplace_back(buffer_pool_entry); + buffer_pool_entry->setInUse(true); + global_free_size += size_; + } +} + +int64_t BufferPool::GetBufferId() +{ + return currBufferIdx; +} + +std::shared_ptr BufferPool::AllocateNewBuffer( + std::shared_ptr currentBufferManagedEntry, + uint32_t colId, uint64_t byte, std::string columnName) +{ + auto strToBool = [](const std::string& s) + { + return s == "true" || s == "1" || s == "yes"; + }; + std::string configValue = + ConfigFactory::Instance().getProperty("pixel.bufferpool.fixedsize"); + bool isFixedSize = strToBool(configValue); + if (isFixedSize) + { + byte = csvReader->get(columnName); + } + + size_t sliceCount = (byte + SLICE_SIZE - 1) / SLICE_SIZE + 1; + size_t totalSizeRaw = (sliceCount + 1) * SLICE_SIZE; + size_t totalSize = directIoLib->getToAllocate(totalSizeRaw); + // need to reallocate + // get origin registered ByteBuffer + auto currentBuffer = currentBufferManagedEntry->getBufferPoolEntry(); + std::shared_ptr original = currentBuffer->getBuffer(); + + // get offset + size_t offset = currentBuffer->getNextFreeIndex(); + // fisrt find anthor empty buffer + if (offset + totalSize > original->size()) + { + if (nextEmptyBufferPoolEntry[currBufferIdx] != nullptr && + nextEmptyBufferPoolEntry[currBufferIdx]->getNextFreeIndex() + + totalSize < + nextEmptyBufferPoolEntry[currBufferIdx]->getSize()) + { + // there are anthor regisitered Buffers + // std::cout<<"curBufID:"<size()<setInUse(false); + currentBuffer = BufferPool::AddNewBuffer(currentBuffer->getSize()); + std::vector> buffers; + buffers.emplace_back(currentBuffer->getBuffer()); + if (!::DirectUringRandomAccessFile::RegisterMoreBuffer( + currentBuffer->getRingIndex(), buffers)) + { + throw std::runtime_error("Failed to register more buffers"); + } + currentBuffer->setInUse(true); + currentBuffer->setIsRegistered(true); + } + offset = currentBuffer->getNextFreeIndex(); + auto newBufferPoolManageEntry = std::make_shared( + currentBuffer, currentBuffer->getRingIndex(), totalSize, offset); + BufferPool::ringBufferMap[currBufferIdx][colId] = newBufferPoolManageEntry; + newBufferPoolManageEntry->setStatus( + BufferPoolManagedEntry::State::AllocatedAndInUse); + original = currentBuffer->getBuffer(); + } + + currentBuffer->setNextFreeIndex(offset + totalSize); + + std::shared_ptr sliced = original->slice(offset, totalSize); + currentBuffer->addCol(colId, sliced->size()); + BufferPool::buffersAllocated[currBufferIdx][colId] = sliced; + auto newBufferPoolManageEntry = + BufferPool::ringBufferMap[currBufferIdx][colId]; + newBufferPoolManageEntry->setStatus( + BufferPoolManagedEntry::State::AllocatedAndInUse); + newBufferPoolManageEntry->setCurrentSize(sliced->size()); + newBufferPoolManageEntry->setOffset(offset); + newBufferPoolManageEntry->setRingIndex(currentBuffer->getRingIndex()); + + global_used_size += totalSize; + thread_local_used_size[currBufferIdx] += totalSize; + thread_local_buffer_count[currBufferIdx]++; + return sliced; } -std::shared_ptr BufferPool::GetBuffer(uint32_t colId) +std::shared_ptr BufferPool::GetBuffer(uint32_t colId, uint64_t byte, + std::string columnName) { - return BufferPool::buffers[currBufferIdx][colId]; + // Retrieve the current buffer management entry and previously allocated + // buffer + auto currentBufferManagedEntry = ringBufferMap[currBufferIdx][colId]; + auto previousBuffer = buffersAllocated[currBufferIdx][colId]; + + // Allocation scenarios: + // 1. Buffer not yet allocated - check the entry's state + // 2. Buffer already allocated + + // Size scenarios: + // 1. Can reuse the previous buffer (reusePrevious()) + // 2. Can allocate required space within the same large buffer + // (allocateBuffer) + // 3. Insufficient remaining space in current large buffer - need a new large + // buffer + + if (currentBufferManagedEntry->getStatus() == + BufferPoolManagedEntry::State::InitizaledNotAllocated) + { + // Buffer not allocated yet + return AllocateNewBuffer(currentBufferManagedEntry, colId, byte, + columnName); + } + else + { + // Buffer already allocated + // Check if current buffer size is sufficient and leaves enough remaining + // space (after accounting for block size) + if (currentBufferManagedEntry->getCurrentSize() >= byte && + currentBufferManagedEntry->getCurrentSize() - + directIoLib->getBlockSize() >= + byte) + { + // Reuse the previous buffer + return previousBuffer; + } + else + { + // Need to allocate a new buffer + return AllocateNewBuffer(currentBufferManagedEntry, colId, byte, + columnName); + } + } } void BufferPool::Reset() { - BufferPool::isInitialized = false; - BufferPool::nrBytes.clear(); + // BufferPool::isInitialized = false; + // std::lock_guard lock(bufferPoolMutex); for (int idx = 0; idx < 2; idx++) { - BufferPool::buffers[idx].clear(); + // BufferPool::currentBuffers[idx]->clear();; + BufferPool::buffersAllocated[idx].clear(); + BufferPool::ringBufferMap[idx].clear(); + // BufferPool::registeredBuffers[idx].clear(); + // thread_local_used_size[idx]=0; + // thread_local_buffer_count[idx]=0; + // global_free_size=0; + global_used_size = 0; + for (auto bufferEntry : registeredBuffers[idx]) + { + bufferEntry->reset(); + } } + BufferPool::colCount = 0; } @@ -126,4 +318,27 @@ void BufferPool::Switch() nextBufferIdx = 1 - nextBufferIdx; } +std::shared_ptr BufferPool::AddNewBuffer(size_t size) +{ + // std::cout<<"Adding new buffer"< buffer_pool_entry = + std::make_shared(size, sliceSize, directIoLib, + currBufferIdx, nextRingIndex++); + registeredBuffers[currBufferIdx].emplace_back(buffer_pool_entry); + buffer_pool_entry->setInUse(true); + global_free_size += size; + nextEmptyBufferPoolEntry[currBufferIdx] = buffer_pool_entry; + return buffer_pool_entry; +} +int BufferPool::getRingIndex(uint32_t colId) +{ + return ringBufferMap[currBufferIdx][colId] + ->getBufferPoolEntry() + ->getRingIndex(); +} diff --git a/cpp/pixels-common/lib/physical/BufferPool/Bitmap.cpp b/cpp/pixels-common/lib/physical/BufferPool/Bitmap.cpp new file mode 100644 index 0000000000..8b0f09ef58 --- /dev/null +++ b/cpp/pixels-common/lib/physical/BufferPool/Bitmap.cpp @@ -0,0 +1,27 @@ +/* +* Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + + +/* + * @author whz + * @create 2025-07-30 + */ +#include "physical/BufferPool/BufferPoolEntry.h" +#include "physical/BufferPool/Bitmap.h" diff --git a/cpp/pixels-common/lib/physical/BufferPool/BufferPoolEntry.cpp b/cpp/pixels-common/lib/physical/BufferPool/BufferPoolEntry.cpp new file mode 100644 index 0000000000..de5f7dbac7 --- /dev/null +++ b/cpp/pixels-common/lib/physical/BufferPool/BufferPoolEntry.cpp @@ -0,0 +1,157 @@ +/* +* Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + + +/* + * @author whz + * @create 2025-07-30 + */ +#include "physical/BufferPool/BufferPoolEntry.h" +#include "physical/BufferPool/Bitmap.h" + +BufferPoolEntry::BufferPoolEntry(size_t size, int slice_size, std::shared_ptr direct_lib, int offset, + int ring_index) + : size_(size), is_full_(false), next_free_(0), is_in_use_(false), offset_in_buffers_(offset), is_registered(false), + ring_index(ring_index) +{ + if (size == 0 || slice_size <= 0) + { + throw std::invalid_argument("Invalid buffer size or slice size"); + } + if (!direct_lib) + { + throw std::invalid_argument("DirectIoLib pointer cannot be null"); + } + + const int slice_count = static_cast(size + slice_size - 1 / slice_size); + // bitmap_ = std::make_shared(slice_count); + + buffer_ = direct_lib->allocateDirectBuffer(size_, false); + memset(buffer_->getPointer(), 0, buffer_->size()); + if (!buffer_) + { + throw std::runtime_error("Failed to allocate direct buffer"); + } +} + +size_t BufferPoolEntry::getSize() const +{ + return size_; +} + +// std::shared_ptr BufferPoolEntry::getBitmap() const { +// return bitmap_; +// } + + +std::shared_ptr BufferPoolEntry::getBuffer() const +{ + return buffer_; +} + + +bool BufferPoolEntry::isFull() const +{ + return is_full_; +} + + +int BufferPoolEntry::getNextFreeIndex() const +{ + return next_free_; +} + + +int BufferPoolEntry::setNextFreeIndex(int index) +{ + const int old_index = next_free_; + next_free_ = index; + is_full_ = (index > size_); + if (is_full_) + { + return -1; + // the buffer is full + } + return old_index; +} + +uint64_t BufferPoolEntry::checkCol(uint32_t col) const +{ + return nr_bytes_.find(col) == nr_bytes_.end() ? -1 : nr_bytes_.at(col); +} + +void BufferPoolEntry::addCol(uint32_t colId, uint64_t bytes) +{ + nr_bytes_[colId] = bytes; +} + +bool BufferPoolEntry::isInUse() const +{ + return is_in_use_; +} + +void BufferPoolEntry::setInUse(bool in_use) +{ + is_in_use_ = in_use; +} + +int BufferPoolEntry::getOffsetInBuffers() const +{ + return offset_in_buffers_; +} + +void BufferPoolEntry::setOffsetInBuffers(int offset) +{ + offset_in_buffers_ = offset; +} + +bool BufferPoolEntry::getIsRegistered() const +{ + return is_registered; +} + +void BufferPoolEntry::setIsRegistered(bool registered) +{ + is_registered = registered; +} + +int BufferPoolEntry::getRingIndex() const +{ + return ring_index; +} + +void BufferPoolEntry::setRingIndex(int ring_index) +{ + ring_index = ring_index; +} + +void BufferPoolEntry::reset() +{ + is_full_ = false; + next_free_ = 0; + // if (bitmap_) { + // bitmap_.reset(); + // } + nr_bytes_.clear(); + is_in_use_ = false; + // reset direct buffer? +} + +BufferPoolEntry::~BufferPoolEntry() = default; diff --git a/cpp/pixels-common/lib/physical/Request.cpp b/cpp/pixels-common/lib/physical/Request.cpp index 6ef2ee7d79..7ba5903db1 100644 --- a/cpp/pixels-common/lib/physical/Request.cpp +++ b/cpp/pixels-common/lib/physical/Request.cpp @@ -24,21 +24,15 @@ */ #include "physical/Request.h" - -Request::Request(uint64_t queryId_, uint64_t start_, uint64_t length_, int64_t bufferId) -{ - queryId = queryId_; - start = start_; - length = length_; - this->bufferId = bufferId; +Request::Request(uint64_t queryId_, uint64_t start_, uint64_t length_, + int64_t bufferId) { + queryId = queryId_; + start = start_; + length = length_; + this->bufferId = bufferId; + ring_index = 0; } -int Request::hashCode() -{ - return (int) ((start << 32) >> 32); -} +int Request::hashCode() { return (int)((start << 32) >> 32); } -int Request::comparedTo(Request o) -{ - return start == o.start; -} +int Request::comparedTo(Request o) { return start == o.start; } diff --git a/cpp/pixels-common/lib/physical/RequestBatch.cpp b/cpp/pixels-common/lib/physical/RequestBatch.cpp index 92db4c5efc..7ecd8de66d 100644 --- a/cpp/pixels-common/lib/physical/RequestBatch.cpp +++ b/cpp/pixels-common/lib/physical/RequestBatch.cpp @@ -66,3 +66,11 @@ void RequestBatch::add(Request request) requests.push_back(request); size++; } + + +Request& RequestBatch::getRequest(int index) { + if (index < 0 || index >= size) { + throw std::out_of_range("RequestBatch::getRequest: index out of range"); + } + return requests[index]; +} \ No newline at end of file diff --git a/cpp/pixels-common/lib/physical/SchedulerFactory.cpp b/cpp/pixels-common/lib/physical/SchedulerFactory.cpp index 77790890ba..2ee5f4025f 100644 --- a/cpp/pixels-common/lib/physical/SchedulerFactory.cpp +++ b/cpp/pixels-common/lib/physical/SchedulerFactory.cpp @@ -24,9 +24,9 @@ */ #include "physical/SchedulerFactory.h" -SchedulerFactory *SchedulerFactory::instance = nullptr; +SchedulerFactory* SchedulerFactory::instance = nullptr; -SchedulerFactory *SchedulerFactory::Instance() +SchedulerFactory* SchedulerFactory::Instance() { if (instance == nullptr) { @@ -35,7 +35,7 @@ SchedulerFactory *SchedulerFactory::Instance() return instance; } -Scheduler *SchedulerFactory::getScheduler() +Scheduler* SchedulerFactory::getScheduler() { return scheduler; } diff --git a/cpp/pixels-common/lib/physical/io/PhysicalLocalReader.cpp b/cpp/pixels-common/lib/physical/io/PhysicalLocalReader.cpp index 4b364ef859..86e60e7489 100644 --- a/cpp/pixels-common/lib/physical/io/PhysicalLocalReader.cpp +++ b/cpp/pixels-common/lib/physical/io/PhysicalLocalReader.cpp @@ -104,14 +104,30 @@ std::string PhysicalLocalReader::getName() } return path.substr(path.find_last_of('/') + 1); } +void PhysicalLocalReader::addRingIndex(int ring_index) { + ring_index_vector.insert(ring_index); +} + +std::unordered_set& PhysicalLocalReader::getRingIndexes() { + return ring_index_vector; +} -std::shared_ptr PhysicalLocalReader::readAsync(int length, std::shared_ptr buffer, int index) +std::unordered_map PhysicalLocalReader::getRingIndexCountMap() { + return ringIndexCountMap; +} + +void PhysicalLocalReader::setRingIndexCountMap(std::unordered_map ringIndexCount) { + //first clear + ringIndexCountMap.clear(); + ringIndexCountMap=ringIndexCount; +} +std::shared_ptr PhysicalLocalReader::readAsync(int length, std::shared_ptr buffer, int index,int ring_index,int start_offset) { numRequests++; if (ConfigFactory::Instance().getProperty("localfs.async.lib") == "iouring") { auto directRaf = std::static_pointer_cast(raf); - return directRaf->readAsync(length, std::move(buffer), index); + return directRaf->readAsync(length, std::move(buffer), index,ring_index,start_offset); } else if (ConfigFactory::Instance().getProperty("localfs.async.lib") == "aio") { @@ -124,13 +140,13 @@ std::shared_ptr PhysicalLocalReader::readAsync(int length, std::sha } -void PhysicalLocalReader::readAsyncSubmit(uint32_t size) +void PhysicalLocalReader::readAsyncSubmit(std::unordered_map sizes,std::unordered_set ring_index) { numRequests++; if (ConfigFactory::Instance().getProperty("localfs.async.lib") == "iouring") { auto directRaf = std::static_pointer_cast(raf); - directRaf->readAsyncSubmit(size); + directRaf->readAsyncSubmit(sizes,ring_index); } else if (ConfigFactory::Instance().getProperty("localfs.async.lib") == "aio") { @@ -142,13 +158,13 @@ void PhysicalLocalReader::readAsyncSubmit(uint32_t size) } } -void PhysicalLocalReader::readAsyncComplete(uint32_t size) +void PhysicalLocalReader::readAsyncComplete(std::unordered_map sizes,std::unordered_set ring_index) { numRequests++; if (ConfigFactory::Instance().getProperty("localfs.async.lib") == "iouring") { auto directRaf = std::static_pointer_cast(raf); - directRaf->readAsyncComplete(size); + directRaf->readAsyncComplete(sizes,ring_index); } else if (ConfigFactory::Instance().getProperty("localfs.async.lib") == "aio") { @@ -160,15 +176,19 @@ void PhysicalLocalReader::readAsyncComplete(uint32_t size) } } -void PhysicalLocalReader::readAsyncSubmitAndComplete(uint32_t size) + +// not use? +void PhysicalLocalReader::readAsyncSubmitAndComplete(uint32_t size,std::unordered_set ring_index) { numRequests++; if (ConfigFactory::Instance().getProperty("localfs.async.lib") == "iouring") { auto directRaf = std::static_pointer_cast(raf); - directRaf->readAsyncSubmit(size); + std::unordered_map sizes; + sizes[0]=0; + directRaf->readAsyncSubmit(sizes,ring_index); ::TimeProfiler::Instance().Start("async wait"); - directRaf->readAsyncComplete(size); + directRaf->readAsyncComplete(sizes,ring_index); ::TimeProfiler::Instance().End("async wait"); } else if (ConfigFactory::Instance().getProperty("localfs.async.lib") == "aio") diff --git a/cpp/pixels-common/lib/physical/natives/ByteBuffer.cpp b/cpp/pixels-common/lib/physical/natives/ByteBuffer.cpp index bb929e2d4f..0013f0b31b 100644 --- a/cpp/pixels-common/lib/physical/natives/ByteBuffer.cpp +++ b/cpp/pixels-common/lib/physical/natives/ByteBuffer.cpp @@ -66,6 +66,26 @@ ByteBuffer::ByteBuffer(ByteBuffer &bb, uint32_t startId, uint32_t length) name = ""; fromOtherBB = true; allocated_by_new = true; + fromSlice=false; +} +ByteBuffer::ByteBuffer(ByteBuffer &bb, uint32_t startId, uint32_t length,bool from_slice) +{ + assert(startId >= 0 && startId + length <= bb.size() && length > 0); + buf = bb.getPointer() + startId; + bufSize = length; + resetPosition(); + name = ""; + fromOtherBB = true; + allocated_by_new = true; + fromSlice=true; +} + +std::shared_ptr ByteBuffer::slice(uint32_t offset, uint32_t length) { + if (offset + length > this->bufSize) { + throw std::runtime_error("Slice range out of bounds"); + } + + return std::make_shared(*this, offset, length,true); } /** diff --git a/cpp/pixels-common/lib/physical/natives/DirectIoLib.cpp b/cpp/pixels-common/lib/physical/natives/DirectIoLib.cpp index 5fdcb2b0ba..c5ffd5e88b 100644 --- a/cpp/pixels-common/lib/physical/natives/DirectIoLib.cpp +++ b/cpp/pixels-common/lib/physical/natives/DirectIoLib.cpp @@ -23,6 +23,10 @@ * @create 2023-04-19 */ #include "physical/natives/DirectIoLib.h" +#include // mmap, munmap, madvise +#include +#include +#include DirectIoLib::DirectIoLib(int fsBlockSize) @@ -31,23 +35,118 @@ DirectIoLib::DirectIoLib(int fsBlockSize) this->fsBlockNotMask = ~((long) fsBlockSize - 1); } -std::shared_ptr DirectIoLib::allocateDirectBuffer(long size) +template struct huge_page_allocator { + constexpr static std::size_t huge_page_size = 1 << 21; // 2 MiB + using value_type = T; + + huge_page_allocator() = default; + template + constexpr huge_page_allocator(const huge_page_allocator &) noexcept {} + + size_t round_to_huge_page_size(size_t n) { + return (((n - 1) / huge_page_size) + 1) * huge_page_size; + } + + T *allocate(std::size_t n) { + if (n > std::numeric_limits::max() / sizeof(T)) { + throw std::bad_alloc(); + } + auto p = static_cast(mmap( + nullptr, round_to_huge_page_size(n * sizeof(T)), PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB, -1, 0)); + if (p == MAP_FAILED) { + throw std::bad_alloc(); + } + return p; + } + + void deallocate(T *p, std::size_t n) { + munmap(p, round_to_huge_page_size(n * sizeof(T))); + } +}; + +std::shared_ptr DirectIoLib::allocateDirectBuffer(long size, bool isSmallBuffer) { - int toAllocate = blockEnd(size) + (size == 1 ? 0 : fsBlockSize); - uint8_t *directBufferPointer; - posix_memalign((void **) &directBufferPointer, fsBlockSize, toAllocate); - auto directBuffer = std::make_shared(directBufferPointer, toAllocate, false); - return directBuffer; + // Lambda function to convert string to boolean + auto strToBool = [](const std::string& s) { + return s == "true" || s == "1" || s == "yes"; + }; + + // Get configuration value for huge page buffer pool + std::string configValue = ConfigFactory::Instance().getProperty("pixel.bufferpool.hugepage"); + bool isHugePage = strToBool(configValue); + + if (isHugePage && !isSmallBuffer) { + // Allocate memory using huge page allocator + try { + // Calculate the size to allocate (considering block alignment) + int toAllocate = blockEnd(size) + (size == 1 ? 0 : fsBlockSize); + + // Create huge page allocator instance + huge_page_allocator allocator; + + // Allocate memory + uint8_t* directBufferPointer = allocator.allocate(toAllocate); + + // Create custom deleter to free memory using the allocator + auto deleter = [allocator, toAllocate](ByteBuffer* buf) mutable { + if (buf && buf->getBuffer()) { + // Deallocate memory using the allocator's deallocate method + allocator.deallocate(const_cast(buf->getBuffer()), toAllocate); + } + // delete buf; // Free the ByteBuffer object itself + }; + + // Create ByteBuffer (does not take over memory ownership, managed by shared_ptr's deleter) + ByteBuffer* buf = new ByteBuffer(directBufferPointer, toAllocate, false); + + // Return shared_ptr with custom deleter + return std::shared_ptr(buf, deleter); + } + catch (const std::bad_alloc& e) { + throw std::runtime_error("Huge page memory allocation failed: " + std::string(e.what()) + + ", error code: " + std::string(strerror(errno))); + } + } else { + // Normal memory allocation path + int toAllocate = blockEnd(size) + (size == 1 ? 0 : fsBlockSize); + uint8_t* directBufferPointer; + + // Allocate aligned memory using posix_memalign + if (posix_memalign((void**)&directBufferPointer, fsBlockSize, toAllocate) != 0) { + throw std::runtime_error("Normal memory allocation failed: " + std::string(strerror(errno))); + } + + // Create custom deleter to free memory + auto deleter = [](ByteBuffer* buf) { + if (buf && buf->getBuffer()) { + free(const_cast(buf->getBuffer())); + } + // delete buf; + }; + + // Create ByteBuffer and return shared_ptr + ByteBuffer* buf = new ByteBuffer(directBufferPointer, toAllocate, false); + return std::shared_ptr(buf, deleter); + } } +int DirectIoLib::getToAllocate(int size) { + return blockEnd(size) + (size == 1 ? 0 : fsBlockSize); +} + + + + std::shared_ptr DirectIoLib::read(int fd, long fileOffset, std::shared_ptr directBuffer, long length) { // the file will be read from blockStart(fileOffset), and the first fileDelta bytes should be ignored. long fileOffsetAligned = blockStart(fileOffset); long toRead = blockEnd(fileOffset + length) - blockStart(fileOffset); - if (pread(fd, directBuffer->getPointer(), toRead, fileOffsetAligned) == -1) - { + ssize_t ret = pread(fd, directBuffer->getPointer(), toRead, fileOffsetAligned); + if (ret == -1) { + perror("pread failed"); throw InvalidArgumentException("DirectIoLib::read: pread fail. "); } auto bb = std::make_shared(*directBuffer, diff --git a/cpp/pixels-common/lib/physical/natives/DirectRandomAccessFile.cpp b/cpp/pixels-common/lib/physical/natives/DirectRandomAccessFile.cpp index 832b980019..786b47bdae 100644 --- a/cpp/pixels-common/lib/physical/natives/DirectRandomAccessFile.cpp +++ b/cpp/pixels-common/lib/physical/natives/DirectRandomAccessFile.cpp @@ -64,7 +64,7 @@ DirectRandomAccessFile::DirectRandomAccessFile(const std::string &file) directIoLib = std::make_shared(fsBlockSize); try { - smallDirectBuffer = directIoLib->allocateDirectBuffer(fsBlockSize); + smallDirectBuffer = directIoLib->allocateDirectBuffer(fsBlockSize,true); } catch (...) { @@ -91,7 +91,7 @@ std::shared_ptr DirectRandomAccessFile::readFully(int len) { if (enableDirect) { - auto directBuffer = directIoLib->allocateDirectBuffer(len); + auto directBuffer = directIoLib->allocateDirectBuffer(len,true); auto buffer = directIoLib->read(fd, offset, directBuffer, len); seek(offset + len); largeBuffers.emplace_back(directBuffer); diff --git a/cpp/pixels-common/lib/physical/natives/DirectUringRandomAccessFile.cpp b/cpp/pixels-common/lib/physical/natives/DirectUringRandomAccessFile.cpp index 70e22fbe37..8b53974fec 100644 --- a/cpp/pixels-common/lib/physical/natives/DirectUringRandomAccessFile.cpp +++ b/cpp/pixels-common/lib/physical/natives/DirectUringRandomAccessFile.cpp @@ -24,53 +24,71 @@ */ #include "physical/natives/DirectUringRandomAccessFile.h" -thread_local struct io_uring *DirectUringRandomAccessFile::ring = nullptr; +#include + +// global +std::mutex DirectUringRandomAccessFile::mutex_; +// thread local thread_local bool DirectUringRandomAccessFile::isRegistered = false; -thread_local struct iovec *DirectUringRandomAccessFile::iovecs = nullptr; -thread_local uint32_t -DirectUringRandomAccessFile::iovecSize = 0; +thread_local std::vector +DirectUringRandomAccessFile::ring_vector; +thread_local std::vector +DirectUringRandomAccessFile::iovecs_vector; +thread_local std::vector DirectUringRandomAccessFile::offsets_vector; +thread_local uint32_t DirectUringRandomAccessFile::iovecSize = 0; -DirectUringRandomAccessFile::DirectUringRandomAccessFile(const std::string &file) : DirectRandomAccessFile(file) +DirectUringRandomAccessFile::DirectUringRandomAccessFile( + const std::string& file) + : DirectRandomAccessFile(file) { - } -void DirectUringRandomAccessFile::RegisterBufferFromPool(std::vector colIds) +void DirectUringRandomAccessFile::RegisterBufferFromPool( + std::vector colIds) { - std::vector > tmpBuffers; if (!isRegistered) { - for (auto buffer: ::BufferPool::buffers) + std::vector> tmpBuffers; + auto ring = DirectUringRandomAccessFile::getRing(0); + struct iovec* iovecs = nullptr; + for (auto buffers : ::BufferPool::registeredBuffers) { - for (auto colId: colIds) + for (auto buffer : buffers) { - tmpBuffers.emplace_back(buffer[colId]); + buffer->setIsRegistered(true); + tmpBuffers.emplace_back(buffer->getBuffer()); } } - iovecs = (iovec *) calloc(tmpBuffers.size(), sizeof(struct iovec)); + iovecs = (iovec*)calloc(tmpBuffers.size(), sizeof(struct iovec)); iovecSize = tmpBuffers.size(); for (auto i = 0; i < tmpBuffers.size(); i++) { auto buffer = tmpBuffers.at(i); iovecs[i].iov_base = buffer->getPointer(); iovecs[i].iov_len = buffer->size(); - memset(iovecs[i].iov_base, 0, buffer->size()); } + iovecs_vector.emplace_back(iovecs); int ret = io_uring_register_buffers(ring, iovecs, iovecSize); if (ret != 0) { - throw InvalidArgumentException("DirectUringRandomAccessFile::RegisterBuffer: register buffer fails. "); + std::cerr << "io_uring_register_buffers failed: " << strerror(errno) + << ", ret=" << ret << std::strerror(ret) << std::endl; + throw InvalidArgumentException( + "DirectUringRandomAccessFile::RegisterBufferFromPool: register " + "buffer fails. "); } isRegistered = true; } } - -void DirectUringRandomAccessFile::RegisterBuffer(std::vector > buffers) +void DirectUringRandomAccessFile::RegisterBuffer( + std::vector> buffers) { if (!isRegistered) { - iovecs = (iovec *) calloc(buffers.size(), sizeof(struct iovec)); + auto ring = DirectUringRandomAccessFile::getRing(0); + struct iovec* iovecs = nullptr; + iovecs = (iovec*)calloc(buffers.size(), sizeof(struct iovec)); iovecSize = buffers.size(); for (auto i = 0; i < buffers.size(); i++) { @@ -79,10 +97,13 @@ void DirectUringRandomAccessFile::RegisterBuffer(std::vector size(); memset(iovecs[i].iov_base, 0, buffer->size()); } + ring_vector.emplace_back(ring); + iovecs_vector.emplace_back(iovecs); int ret = io_uring_register_buffers(ring, iovecs, iovecSize); if (ret != 0) { - throw InvalidArgumentException("DirectUringRandomAccessFile::RegisterBuffer: register buffer fails. "); + throw InvalidArgumentException("DirectUringRandomAccessFile::" + "RegisterBuffer: register buffer fails. "); } isRegistered = true; } @@ -90,101 +111,224 @@ void DirectUringRandomAccessFile::RegisterBuffer(std::vector lock(mutex_); + getRing(0); +} + +void DirectUringRandomAccessFile::Reset() +{ + // Important! Because sometimes ring is nullptr here. + // For example, two threads A and B share the same global state. If A finish + // all files while B just starts, B would execute Reset function from + // InitLocal. If we don't set this 'if' branch, ring would be double freed. + for (auto ring : ring_vector) { - ring = new io_uring(); - if (io_uring_queue_init(4096, ring, 0) < 0) + if (ring != nullptr) { - throw InvalidArgumentException("DirectRandomAccessFile: initialize io_uring fails."); + // We don't use this function anymore since it slows down the speed + // if(io_uring_unregister_buffers(ring) != 0) { + // throw + //InvalidArgumentException("DirectUringRandomAccessFile::UnregisterBuffer: + //unregister buffer fails. "); + // } + io_uring_queue_exit(ring); + delete (ring); + ring = nullptr; + isRegistered = false; } } + ring_vector.clear(); + for (auto iovecs : iovecs_vector) + { + if (iovecs != nullptr) + { + free(iovecs); + iovecs = nullptr; + } + } + iovecs_vector.clear(); } -void DirectUringRandomAccessFile::Reset() +bool DirectUringRandomAccessFile::RegisterMoreBuffer( + int index, std::vector> buffers) { - // Important! Because sometimes ring is nullptr here. - // For example, two threads A and B share the same global state. If A finish all files while B just starts, - // B would execute Reset function from InitLocal. If we don't set this 'if' branch, ring would be double freed. - if (ring != nullptr) + assert(isRegistered); + auto ring = DirectUringRandomAccessFile::getRing(index); + struct iovec* iovecs = nullptr; + iovecs = (iovec*)calloc(buffers.size(), sizeof(struct iovec)); + iovecSize = buffers.size(); + for (auto i = 0; i < buffers.size(); i++) { - // We don't use this function anymore since it slows down the speed - // if(io_uring_unregister_buffers(ring) != 0) { - // throw InvalidArgumentException("DirectUringRandomAccessFile::UnregisterBuffer: unregister buffer fails. "); - // } - io_uring_queue_exit(ring); - delete (ring); - ring = nullptr; - isRegistered = false; + auto buffer = buffers.at(i); + iovecs[i].iov_base = buffer->getPointer(); + iovecs[i].iov_len = buffer->size(); } - if (iovecs != nullptr) + iovecs_vector.emplace_back(iovecs); + int ret = io_uring_register_buffers(ring, iovecs, iovecSize); + if (ret != 0) { - free(iovecs); - iovecs = nullptr; + throw InvalidArgumentException( + "DirectUringRandomAccessFile::RegisterMoreBuffer: register buffer " + "fails. "); } + return true; } DirectUringRandomAccessFile::~DirectUringRandomAccessFile() { - } -std::shared_ptr -DirectUringRandomAccessFile::readAsync(int length, std::shared_ptr buffer, int index) +std::shared_ptr DirectUringRandomAccessFile::readAsync( + int length, std::shared_ptr buffer, int index, int ring_index, + int start_offset) { + auto ring = DirectUringRandomAccessFile::getRing(ring_index); + auto offset = start_offset; + if (enableDirect) { - struct io_uring_sqe *sqe = io_uring_get_sqe(ring); -// if(length > iovecs[index].iov_len) { -// throw InvalidArgumentException("DirectUringRandomAccessFile::readAsync: the length is larger than buffer length."); -// } - // the file will be read from blockStart(fileOffset), and the first fileDelta bytes should be ignored. + struct io_uring_sqe* sqe = io_uring_get_sqe(ring); + if (!sqe) + { + throw std::runtime_error("DirectUringRandomAccessFile::readAsync: failed " + "to get SQE, submission queue is full"); + } + // the file will be read from blockStart(fileOffset), and the first + // fileDelta bytes should be ignored. uint64_t fileOffsetAligned = directIoLib->blockStart(offset); - uint64_t toRead = directIoLib->blockEnd(offset + length) - directIoLib->blockStart(offset); + uint64_t toRead = directIoLib->blockEnd(offset + length) - + directIoLib->blockStart(offset); + size_t block_size = directIoLib->getBlockSize(); // 假设存在获取块大小的方法 + size_t required_buffer_size = (offset - fileOffsetAligned) + length; + if (buffer->size() < required_buffer_size) + { + std::stringstream ss; + std::cout << "DirectUringRandomAccessFile::readAsync: buffer size " + "insufficient. " + << "Required: " << required_buffer_size + << ", Actual: " << buffer->size() + << ", ring_index: " << ring_index << ", index: " << index + << "Required alignment: " << block_size << ", length:" << length + << std::endl;; + throw InvalidArgumentException(ss.str()); + } + io_uring_prep_read_fixed(sqe, fd, buffer->getPointer(), toRead, fileOffsetAligned, index); - auto bb = std::make_shared(*buffer, - offset - fileOffsetAligned, length); - seek(offset + length); + if (fd < 0) + { + throw std::runtime_error( + "DirectUringRandomAccessFile::readAsync: invalid file descriptor"); + } + + auto bb = std::make_shared(*buffer, offset - fileOffsetAligned, + length); + seekByIndex(offset + length, ring_index); return bb; } else { - struct io_uring_sqe *sqe = io_uring_get_sqe(ring); -// if(length > iovecs[index].iov_len) { -// throw InvalidArgumentException("DirectUringRandomAccessFile::readAsync: the length is larger than buffer length."); -// } - io_uring_prep_read_fixed(sqe, fd, buffer->getPointer(), length, offset, index); + struct io_uring_sqe* sqe = io_uring_get_sqe(ring); + io_uring_prep_read_fixed(sqe, fd, buffer->getPointer(), length, offset, + index); seek(offset + length); auto result = std::make_shared(*buffer, 0, length); return result; } - } - -void DirectUringRandomAccessFile::readAsyncSubmit(int size) +void DirectUringRandomAccessFile::seekByIndex(long off, int index) { - int ret = io_uring_submit(ring); - if (ret != size) + if (index < 0 || static_cast(index) >= offsets_vector.size()) + { + std::stringstream ss; + ss << "DirectUringRandomAccessFile::seekByIndex: invalid index. " + << "Index: " << index << ", Vector size: " << offsets_vector.size(); + throw InvalidArgumentException(ss.str()); + } + if (off < 0) { - throw InvalidArgumentException("DirectUringRandomAccessFile::readAsyncSubmit: submit fails"); + std::stringstream ss; + ss << "DirectUringRandomAccessFile::seekByIndex: invalid offset. " + << "Offset: " << off << ", Index: " << index; + throw InvalidArgumentException(ss.str()); } + offsets_vector.at(index) = off; } -void DirectUringRandomAccessFile::readAsyncComplete(int size) +void DirectUringRandomAccessFile::readAsyncSubmit( + std::unordered_map sizes, + std::unordered_set ring_index) { - // Important! We cannot write the code as io_uring_wait_cqe_nr(ring, &cqe, iovecSize). - // The reason is unclear, but some random bugs would happen. It takes me nearly a week to find this bug - struct io_uring_cqe *cqe; - for (int i = 0; i < size; i++) + for (auto i : ring_index) { - if (io_uring_wait_cqe_nr(ring, &cqe, 1) != 0) + auto ring = DirectUringRandomAccessFile::getRing(i); + int ret = io_uring_submit(ring); + if (ret != sizes[i]) { - throw InvalidArgumentException("DirectUringRandomAccessFile::readAsyncComplete: wait cqe fails"); + std::string error_msg = + "DirectUringRandomAccessFile::readAsyncSubmit: submit fails. " + "Index: " + + std::to_string(i) + + ", " + "Expected size: " + + std::to_string(sizes[i]) + + ", " + "Actual return value: " + + std::to_string(ret); + throw InvalidArgumentException(error_msg); } - io_uring_cqe_seen(ring, cqe); } } +void DirectUringRandomAccessFile::readAsyncComplete( + std::unordered_map sizes, + std::unordered_set ring_index) +{ + // Important! We cannot write the code as io_uring_wait_cqe_nr(ring, &cqe, + // iovecSize). The reason is unclear, but some random bugs would happen. It + // takes me nearly a week to find this bug + for (auto idx : ring_index) + { + // 重命名外层循环变量,避免与内层冲突 + struct io_uring_cqe* cqe; + auto ring = DirectUringRandomAccessFile::getRing(idx); + for (int j = 0; j < sizes[idx]; j++) // 注意:这里使用外层的idx作为sizes的键 + { + if (io_uring_wait_cqe_nr(ring, &cqe, 1) != 0) + { + throw InvalidArgumentException( + "DirectUringRandomAccessFile::readAsyncComplete: wait cqe fails"); + } + io_uring_cqe_seen(ring, cqe); + } + } +} +struct io_uring* DirectUringRandomAccessFile::getRing(int index) +{ + if (index >= ring_vector.size()) + { + // need to add more rings + // initialize io_uring ring + struct io_uring* ring = nullptr; + auto flag = std::stoi( + ConfigFactory::Instance().getProperty("pixels.io_uring.mode")); + // get io_uring flags for io mode + // 0 interrupt-dirven + // 1 IORING_SETUP_IOPOLL + // 2 IORING_SETUP_SQPOLL + if (ring == nullptr) + { + ring = new io_uring(); + if (io_uring_queue_init(4096, ring, flag) < 0) + { + throw InvalidArgumentException( + "DirectRandomAccessFile: initialize io_uring fails."); + } + } + ring_vector.emplace_back(ring); + offsets_vector.emplace_back(0); + } + return ring_vector[index]; +} diff --git a/cpp/pixels-common/lib/physical/scheduler/NoopScheduler.cpp b/cpp/pixels-common/lib/physical/scheduler/NoopScheduler.cpp index 4af3bfce9e..c49856d268 100644 --- a/cpp/pixels-common/lib/physical/scheduler/NoopScheduler.cpp +++ b/cpp/pixels-common/lib/physical/scheduler/NoopScheduler.cpp @@ -25,10 +25,11 @@ #include "physical/scheduler/NoopScheduler.h" #include "exception/InvalidArgumentException.h" #include "physical/io/PhysicalLocalReader.h" +#include -Scheduler *NoopScheduler::instance = nullptr; +Scheduler* NoopScheduler::instance = nullptr; -Scheduler *NoopScheduler::Instance() +Scheduler* NoopScheduler::Instance() { if (instance == nullptr) { @@ -37,31 +38,49 @@ Scheduler *NoopScheduler::Instance() return instance; } -std::vector > NoopScheduler::executeBatch(std::shared_ptr reader, - RequestBatch batch, long queryId) +std::vector> +NoopScheduler::executeBatch(std::shared_ptr reader, + RequestBatch batch, long queryId) { return executeBatch(reader, batch, {}, queryId); } - -std::vector > -NoopScheduler::executeBatch(std::shared_ptr reader, RequestBatch batch, - std::vector > reuseBuffers, long queryId) +std::vector> NoopScheduler::executeBatch( + std::shared_ptr reader, RequestBatch batch, + std::vector> reuseBuffers, long queryId) { auto requests = batch.getRequests(); - std::vector > results; + std::vector> results; results.resize(batch.getSize()); - if (ConfigFactory::Instance().boolCheckProperty("localfs.enable.async.io") && reuseBuffers.size() > 0) + if (ConfigFactory::Instance().boolCheckProperty("localfs.enable.async.io") && + reuseBuffers.size() > 0) { // async read auto localReader = std::static_pointer_cast(reader); + std::unordered_set ring_index_set = localReader->getRingIndexes(); + std::unordered_map ringIndexCountMap; + for (int i = 0; i < batch.getSize(); i++) { Request request = requests[i]; - localReader->seek(request.start); - results.at(i) = localReader->readAsync(request.length, reuseBuffers.at(i), request.bufferId); + if (request.length > reuseBuffers.at(i)->size()) + { + throw InvalidArgumentException( + "The error is not here; need to pay attention to the previous " + "critical section\n"); + } + results.at(i) = localReader->readAsync(request.length, reuseBuffers.at(i), + request.bufferId, + request.ring_index, request.start); + if (ring_index_set.find(request.ring_index) == ring_index_set.end()) + { + ring_index_set.insert(request.ring_index); + localReader->addRingIndex(request.ring_index); + } + ringIndexCountMap[request.ring_index]++; } - localReader->readAsyncSubmit(batch.getSize()); + localReader->readAsyncSubmit(ringIndexCountMap, ring_index_set); + localReader->setRingIndexCountMap(ringIndexCountMap); } else { @@ -78,14 +97,11 @@ NoopScheduler::executeBatch(std::shared_ptr reader, RequestBatc { results.at(i) = reader->readFully(request.length); } - } } return results; - } - NoopScheduler::~NoopScheduler() { delete instance; diff --git a/cpp/pixels-common/lib/utils/MutexTracker.cpp b/cpp/pixels-common/lib/utils/MutexTracker.cpp new file mode 100644 index 0000000000..d4b006d9f6 --- /dev/null +++ b/cpp/pixels-common/lib/utils/MutexTracker.cpp @@ -0,0 +1,25 @@ +/* +* Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + +/* + * @author whz + * @create 2025-08-23 + */ +#include "utils/MutexTracker.h" \ No newline at end of file diff --git a/cpp/pixels-core/include/reader/PixelsReaderOption.h b/cpp/pixels-core/include/reader/PixelsReaderOption.h index a43200694b..9a051fd5ec 100644 --- a/cpp/pixels-core/include/reader/PixelsReaderOption.h +++ b/cpp/pixels-core/include/reader/PixelsReaderOption.h @@ -57,6 +57,8 @@ class PixelsReaderOption void setFilter(duckdb::TableFilterSet *filter); + void setRingIndex(int ringIndex); + duckdb::TableFilterSet *getFilter(); int getRGStart(); @@ -65,6 +67,8 @@ class PixelsReaderOption int getBatchSize() const; + int getRingIndex() const; + void setTolerantSchemaEvolution(bool t); bool isTolerantSchemaEvolution(); @@ -85,5 +89,6 @@ class PixelsReaderOption int batchSize; int rgStart; int rgLen; + int ring_index; }; #endif //PIXELS_PIXELSREADEROPTION_H diff --git a/cpp/pixels-core/include/reader/PixelsRecordReaderImpl.h b/cpp/pixels-core/include/reader/PixelsRecordReaderImpl.h index 190663f574..c098482a58 100644 --- a/cpp/pixels-core/include/reader/PixelsRecordReaderImpl.h +++ b/cpp/pixels-core/include/reader/PixelsRecordReaderImpl.h @@ -98,6 +98,7 @@ class PixelsRecordReaderImpl : public PixelsRecordReader void UpdateRowGroupInfo(); + static std::mutex mutex_; std::shared_ptr physicalReader; pixels::proto::Footer footer; pixels::proto::PostScript postScript; @@ -105,6 +106,7 @@ class PixelsRecordReaderImpl : public PixelsRecordReader PixelsReaderOption option; duckdb::TableFilterSet *filter; long queryId; + int ring_index; int RGStart; int RGLen; bool everRead; diff --git a/cpp/pixels-core/include/vector/ColumnVector.h b/cpp/pixels-core/include/vector/ColumnVector.h index d2747311fc..f4f322ada4 100644 --- a/cpp/pixels-core/include/vector/ColumnVector.h +++ b/cpp/pixels-core/include/vector/ColumnVector.h @@ -85,6 +85,7 @@ class ColumnVector uint64_t *isValid; explicit ColumnVector(uint64_t len, bool encoding); + ~ColumnVector(); idx_t getCapacity() const; diff --git a/cpp/pixels-core/lib/PixelsFilter.cpp b/cpp/pixels-core/lib/PixelsFilter.cpp index 90248a3937..643715eedc 100644 --- a/cpp/pixels-core/lib/PixelsFilter.cpp +++ b/cpp/pixels-core/lib/PixelsFilter.cpp @@ -298,6 +298,9 @@ void PixelsFilter::ApplyFilter(std::shared_ptr vector, duckdb::Ta case duckdb::TableFilterType::IS_NULL: // TODO: support is null break; + case duckdb::TableFilterType::OPTIONAL_FILTER: + // nothing to do + return; default: D_ASSERT(0); break; diff --git a/cpp/pixels-core/lib/reader/PixelsReaderOption.cpp b/cpp/pixels-core/lib/reader/PixelsReaderOption.cpp index dabcd702a9..66b9b01da9 100644 --- a/cpp/pixels-core/lib/reader/PixelsReaderOption.cpp +++ b/cpp/pixels-core/lib/reader/PixelsReaderOption.cpp @@ -67,6 +67,14 @@ long PixelsReaderOption::getQueryId() return queryId; } +int PixelsReaderOption::getRingIndex() const { + return ring_index; +} + +void PixelsReaderOption::setRingIndex(int i) { + ring_index = i; +} + void PixelsReaderOption::setRGRange(int start, int len) { rgStart = start; diff --git a/cpp/pixels-core/lib/reader/PixelsRecordReaderImpl.cpp b/cpp/pixels-core/lib/reader/PixelsRecordReaderImpl.cpp index 02f6fc1cf1..5c6f646938 100644 --- a/cpp/pixels-core/lib/reader/PixelsRecordReaderImpl.cpp +++ b/cpp/pixels-core/lib/reader/PixelsRecordReaderImpl.cpp @@ -25,7 +25,7 @@ #include "reader/PixelsRecordReaderImpl.h" #include "physical/io/PhysicalLocalReader.h" #include "profiler/CountProfiler.h" - +std::mutex PixelsRecordReaderImpl::mutex_; PixelsRecordReaderImpl::PixelsRecordReaderImpl(std::shared_ptr reader, const pixels::proto::PostScript &pixelsPostScript, const pixels::proto::Footer &pixelsFooter, @@ -37,7 +37,7 @@ PixelsRecordReaderImpl::PixelsRecordReaderImpl(std::shared_ptr postScript = pixelsPostScript; footerCache = pixelsFooterCache; option = opt; - // TODO: intialize all kinds of variables + // TODO: intialize all kinds of variable queryId = option.getQueryId(); RGStart = option.getRGStart(); RGLen = option.getRGLen(); @@ -361,7 +361,7 @@ void PixelsRecordReaderImpl::prepareRead() uint64_t footerOffset = rowGroupInformation.footeroffset(); uint64_t footerLength = rowGroupInformation.footerlength(); fis.push_back(i); - requestBatch.add(queryId, (int) footerOffset, (int) footerLength); + requestBatch.add(queryId, (int) footerOffset, (int) footerLength,ring_index); rowGroupFooterCacheHit.at(i) = false; } } @@ -401,7 +401,8 @@ void PixelsRecordReaderImpl::asyncReadComplete(int requestSize) if (ConfigFactory::Instance().getProperty("localfs.async.lib") == "iouring") { auto localReader = std::static_pointer_cast(physicalReader); - localReader->readAsyncComplete(requestSize); + auto ringIndexCountMap=localReader->getRingIndexCountMap(); + localReader->readAsyncComplete(ringIndexCountMap,localReader->getRingIndexes()); asyncReadRequestNum -= requestSize; } else if (ConfigFactory::Instance().getProperty("localfs.async.lib") == "aio") @@ -456,6 +457,7 @@ bool PixelsRecordReaderImpl::read() if (!diskChunks.empty()) { + // std::lock_guard lock(mutex_); RequestBatch requestBatch((int) diskChunks.size()); Scheduler *scheduler = SchedulerFactory::Instance()->getScheduler(); std::vector colIds; @@ -463,20 +465,41 @@ bool PixelsRecordReaderImpl::read() for (int i = 0; i < diskChunks.size(); i++) { ChunkId chunk = diskChunks.at(i); - requestBatch.add(queryId, chunk.offset, (int) chunk.length, ::BufferPool::GetBufferId(i)); + requestBatch.add(queryId, chunk.offset, (int) chunk.length, ::BufferPool::GetBufferId()); colIds.emplace_back(chunk.columnId); bytes.emplace_back(chunk.length); } - ::BufferPool::Initialize(colIds, bytes, fileSchema->getFieldNames()); + + std::thread::id thread_id=std::this_thread::get_id(); + auto columnNames=fileSchema->getFieldNames(); + ::BufferPool::Initialize(colIds, bytes, columnNames); ::DirectUringRandomAccessFile::RegisterBufferFromPool(colIds); std::vector > originalByteBuffers; + std::vector ring_col; for (int i = 0; i < colIds.size(); i++) { auto colId = colIds.at(i); - originalByteBuffers.emplace_back(::BufferPool::GetBuffer(colId)); + auto byte=bytes.at(i); + auto currentBufferEntry=::BufferPool::GetBuffer(colId,byte,columnNames[colId]); + originalByteBuffers.emplace_back(currentBufferEntry); + requestBatch.getRequest(i).ring_index=::BufferPool::getRingIndex(colId); + if (currentBufferEntry->size()-requestBatch.getRequest(i).length<=4096) { + std::cout<<"i:"<size()<< + " requestBatch.length"<executeBatch( + // ::BufferPool::PrintStats(); + + auto byteBuffers = scheduler->executeBatch( physicalReader, requestBatch, originalByteBuffers, queryId); if(ConfigFactory::Instance().boolCheckProperty("localfs.enable.async.io") @@ -490,6 +513,7 @@ bool PixelsRecordReaderImpl::read() ChunkId chunk = diskChunks.at(index); std::shared_ptr bb = byteBuffers.at(index); uint32_t colId = chunk.columnId; + if (bb != nullptr) { chunkBuffers.at(colId) = bb; diff --git a/cpp/pixels-core/lib/vector/ColumnVector.cpp b/cpp/pixels-core/lib/vector/ColumnVector.cpp index 48a757e48a..c0a9b80dda 100644 --- a/cpp/pixels-core/lib/vector/ColumnVector.cpp +++ b/cpp/pixels-core/lib/vector/ColumnVector.cpp @@ -38,6 +38,10 @@ ColumnVector::ColumnVector(uint64_t len, bool encoding) posix_memalign(reinterpret_cast(&isValid), 64, ceil(1.0 * len / 64) * sizeof(uint64_t)); } +ColumnVector::~ColumnVector() +{ +} + void ColumnVector::close() { if (!closed) @@ -50,6 +54,10 @@ void ColumnVector::close() free(isValid); isValid = nullptr; } + if (isNull != nullptr) + { + isNull = nullptr; + } } } diff --git a/cpp/pixels-cpp.properties b/cpp/pixels-cpp.properties index 0a65c450ac..ca0bda4a1d 100644 --- a/cpp/pixels-cpp.properties +++ b/cpp/pixels-cpp.properties @@ -1,5 +1,4 @@ -# pixels c++ configurations - +# pixels c++ reader configurations # valid values: noop, sortmerge, ratelimited read.request.scheduler=noop @@ -12,14 +11,13 @@ localfs.enable.async.io=true # the lib of async is iouring or aio localfs.async.lib=iouring # pixel.stride must be the same as the stride size in pxl data -# pixel.stride=10000 -pixel.stride=2 +pixel.stride=10000 # the work thread to run pixels. -1 means using all CPU cores pixel.threads=-1 # column size path. It is optional. If no column size path is designated, the # size of first pixels data is used. For example: -# pixel.column.size.path=/scratch/liyu/opt/pixels/cpp/pixels-duckdb/duckdb/benchmark/clickbench/clickbench-size.csv -pixel.column.size.path= +pixel.column.size.path=/home/whz/pixels/clickbench-size-e0.csv +# pixel.column.size.path= # the work thread to run parquet. -1 means using all CPU cores parquet.threads=-1 @@ -32,8 +30,8 @@ parquet.threads=-1 storage.directory.depth=1 # the row group size in bytes for pixels writer, should not exceed 2GB -# row.group.size=268435456 -row.group.size=100 +row.group.size=268435456 + # the block size for block-wise storage systems such as HDFS block.size=2147483648 # the number of replications of each block for block-wise storage systems such as HDFS @@ -44,3 +42,25 @@ column.chunk.alignment=32 # for DuckDB, it is only effective when column.chunk.alignment also meets the alignment of the isNull bitmap isnull.bitmap.alignment=8 + + +# for change BufferPool ExtraSize +pixel.bufferpool.extraSize=3145728 + + +# for change slice size in BufferPool +pixel.bufferpool.sliceSize=64 +# smallbuffer 10085160 largebuffer 356596096 + +pixel.bufferpool.bufferpoolSize=356596096 + + +pixel.bufferpool.fixedsize=false +pixel.bufferpool.bufferNum=20 +pixel.bufferpool.hugepage=true + +# 0 interrupt-dirven +# 1 IORING_SETUP_IOPOLL +# 2 IORING_SETUP_SQPOLL +pixels.io_uring.mode=0 + diff --git a/cpp/pixels-duckdb/PixelsScanFunction.cpp b/cpp/pixels-duckdb/PixelsScanFunction.cpp index c8aec8b7ef..d68f6ac787 100644 --- a/cpp/pixels-duckdb/PixelsScanFunction.cpp +++ b/cpp/pixels-duckdb/PixelsScanFunction.cpp @@ -29,7 +29,7 @@ namespace duckdb { -bool PixelsScanFunction::enable_filter_pushdown = false; +bool PixelsScanFunction::enable_filter_pushdown = true; static idx_t PixelsScanGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p, LocalTableFunctionState *local_state, @@ -63,8 +63,8 @@ TableFunctionSet PixelsScanFunction::GetFunctionSet() TableFunction table_function("pixels_scan", {LogicalType::VARCHAR}, PixelsScanImplementation, PixelsScanBind, PixelsScanInitGlobal, PixelsScanInitLocal); table_function.projection_pushdown = true; -// table_function.filter_pushdown = true; - //table_function.filter_prune = true; + table_function.filter_pushdown = true; + // table_function.filter_prune = true; enable_filter_pushdown = table_function.filter_pushdown; MultiFileReader::AddParameters(table_function); table_function.cardinality = PixelsCardinality; @@ -235,6 +235,10 @@ unique_ptr PixelsScanFunction::PixelsScanInitGlobal( result->max_threads = max_threads; + result->active_threads=max_threads; + + result->all_done=false; + result->batch_index = 0; result->filters = input.filters.get(); @@ -447,7 +451,7 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState &data, vectorizedRowBatch->increment(thisOutputChunkRows); } -bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, const PixelsReadBindData &bind_data, +bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, PixelsReadBindData &bind_data, PixelsReadLocalState &scan_data, PixelsReadGlobalState ¶llel_state, bool is_init_state) @@ -469,19 +473,23 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, const P parallel_state.file_index.at(scan_data.deviceID) >= StorageInstance->getFileSum(scan_data.deviceID)) || scan_data.next_file_index >= StorageInstance->getFileSum(scan_data.deviceID)) { - ::BufferPool::Reset(); - // if async io is enabled, we need to unregister uring buffer - if (ConfigFactory::Instance().boolCheckProperty("localfs.enable.async.io")) + int remaining_threads = --parallel_state.active_threads; + if (remaining_threads==0&&!parallel_state.all_done) { + ::BufferPool::Reset(); + // if async io is enabled, we need to unregister uring buffer + if (ConfigFactory::Instance().boolCheckProperty("localfs.enable.async.io")) { - if (ConfigFactory::Instance().getProperty("localfs.async.lib") == "iouring") + if (ConfigFactory::Instance().getProperty("localfs.async.lib") == "iouring") { - ::DirectUringRandomAccessFile::Reset(); + ::DirectUringRandomAccessFile::Reset(); } else if (ConfigFactory::Instance().getProperty("localfs.async.lib") == "aio") { - throw InvalidArgumentException( - "PhysicalLocalReader::readAsync: We don't support aio for our async read yet."); + throw InvalidArgumentException( + "PhysicalLocalReader::readAsync: We don't support aio for our async read yet."); } } + parallel_state.all_done = true; + } parallel_lock.unlock(); return false; } @@ -491,6 +499,7 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, const P scan_data.next_file_index = parallel_state.file_index.at(scan_data.deviceID); scan_data.next_batch_index = StorageInstance->getBatchID(scan_data.deviceID, scan_data.next_file_index); scan_data.curr_file_name = scan_data.next_file_name; + bind_data.curFileId.fetch_add(1); parallel_state.file_index.at(scan_data.deviceID)++; parallel_lock.unlock(); // The below code uses global state but no race happens, so we don't need the lock anymore @@ -501,7 +510,12 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, const P scan_data.currReader->close(); } - ::BufferPool::Switch(); + if (ConfigFactory::Instance().getProperty("pixels.doublebuffer")=="true") + { + ::BufferPool::Switch(); + } + // double/single buffer + scan_data.currReader = scan_data.nextReader; scan_data.currPixelsRecordReader = scan_data.nextPixelsRecordReader; // asyncReadComplete is not invoked in the first run (is_init_state = true) @@ -509,24 +523,36 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, const P { auto currPixelsRecordReader = std::static_pointer_cast( scan_data.currPixelsRecordReader); + if (ConfigFactory::Instance().getProperty("pixels.doublebuffer")=="false") + { + //single buffer + currPixelsRecordReader->read(); + } + currPixelsRecordReader->asyncReadComplete((int) scan_data.column_names.size()); } if (scan_data.next_file_index < StorageInstance->getFileSum(scan_data.deviceID)) { - auto footerCache = std::make_shared(); - auto builder = std::make_shared(); - std::shared_ptr<::Storage> storage = StorageFactory::getInstance()->getStorage(::Storage::file); - scan_data.next_file_name = StorageInstance->getFileName(scan_data.deviceID, scan_data.next_file_index); - scan_data.nextReader = builder->setPath(scan_data.next_file_name) - ->setStorage(storage) - ->setPixelsFooterCache(footerCache) - ->build(); + auto footerCache = std::make_shared(); + auto builder = std::make_shared(); + std::shared_ptr<::Storage> storage = StorageFactory::getInstance()->getStorage(::Storage::file); + scan_data.next_file_name = StorageInstance->getFileName(scan_data.deviceID, scan_data.next_file_index); + scan_data.nextReader = builder->setPath(scan_data.next_file_name) + ->setStorage(storage) + ->setPixelsFooterCache(footerCache) + ->build(); PixelsReaderOption option = GetPixelsReaderOption(scan_data, parallel_state); scan_data.nextPixelsRecordReader = scan_data.nextReader->read(option); auto nextPixelsRecordReader = std::static_pointer_cast( scan_data.nextPixelsRecordReader); - nextPixelsRecordReader->read(); + + if (ConfigFactory::Instance().getProperty("pixels.doublebuffer")=="true") + { + //double buffer + nextPixelsRecordReader->read(); + } + } else { scan_data.nextReader = nullptr; diff --git a/cpp/pixels-duckdb/duckdb b/cpp/pixels-duckdb/duckdb index c3dc6d34c9..e0af5da3aa 160000 --- a/cpp/pixels-duckdb/duckdb +++ b/cpp/pixels-duckdb/duckdb @@ -1 +1 @@ -Subproject commit c3dc6d34c905bc44f311bf670b1bbddef1c0c776 +Subproject commit e0af5da3aaddf16c7b54bc1acaba94d5c2bcdd73 diff --git a/cpp/pixels-duckdb/pixels_extension.cpp b/cpp/pixels-duckdb/pixels_extension.cpp index 2ab79edc5e..4b2c61f1cc 100644 --- a/cpp/pixels-duckdb/pixels_extension.cpp +++ b/cpp/pixels-duckdb/pixels_extension.cpp @@ -25,74 +25,66 @@ #define DUCKDB_EXTENSION_MAIN #include "pixels_extension.hpp" -#include "PixelsScanFunction.hpp" #include "PixelsReadBindData.hpp" +#include "PixelsScanFunction.hpp" #include "duckdb.hpp" #include "duckdb/common/exception.hpp" +#include "duckdb/common/optional_ptr.hpp" #include "duckdb/common/string_util.hpp" #include "duckdb/function/scalar_function.hpp" -#include "duckdb/common/optional_ptr.hpp" #include -namespace duckdb -{ - - // Pixels Scan Replacement for duckdb 1.1 - unique_ptr PixelsScanReplacement(ClientContext &context, ReplacementScanInput &input, - optional_ptr data) - { - auto table_name = ReplacementScan::GetFullPath(input); -// if(!ReplacementScan::CanReplace(table_name,{"pixels"})){ -// return nullptr; -// } - auto lower_name = StringUtil::Lower(table_name); - if (!StringUtil::EndsWith(lower_name, ".pxl") && !StringUtil::Contains(lower_name, ".pxl?")) - { - return nullptr; - } - auto table_function = make_uniq(); - vector > children; - children.push_back(make_uniq(Value(table_name))); - table_function->function = make_uniq("pixels_scan", std::move(children)); - if (!FileSystem::HasGlob(table_name)) - { - auto &fs = FileSystem::GetFileSystem(context); - table_function->alias = fs.ExtractBaseName(table_name); - } - return std::move(table_function); - } - - - void PixelsExtension::Load(ExtensionLoader &loader) { - auto &db_instance=loader.GetDatabaseInstance(); - Connection con(db_instance); - con.BeginTransaction(); - - auto &context=*con.context; - auto &catalog=Catalog::GetSystemCatalog(*con.context); - - auto scan_fun=PixelsScanFunction::GetFunctionSet(); - - CreateTableFunctionInfo cinfo(scan_fun); - cinfo.name = "pixels_scan"; - - catalog.CreateTableFunction(context, &cinfo); - con.Commit(); - - auto &config = DBConfig::GetConfig(db_instance); - config.replacement_scans.emplace_back(PixelsScanReplacement); - - } - - std::string PixelsExtension::Name() - { - return "pixels"; - } - +namespace duckdb { + +// Pixels Scan Replacement for duckdb 1.1 +unique_ptr +PixelsScanReplacement(ClientContext &context, ReplacementScanInput &input, + optional_ptr data) { + auto table_name = ReplacementScan::GetFullPath(input); + // if(!ReplacementScan::CanReplace(table_name,{"pixels"})){ + // return nullptr; + // } + auto lower_name = StringUtil::Lower(table_name); + if (!StringUtil::EndsWith(lower_name, ".pxl") && + !StringUtil::Contains(lower_name, ".pxl?")) { + return nullptr; + } + auto table_function = make_uniq(); + vector> children; + children.push_back(make_uniq(Value(table_name))); + table_function->function = + make_uniq("pixels_scan", std::move(children)); + if (!FileSystem::HasGlob(table_name)) { + auto &fs = FileSystem::GetFileSystem(context); + table_function->alias = fs.ExtractBaseName(table_name); + } + return std::move(table_function); +} + +void PixelsExtension::Load(ExtensionLoader &loader) { + auto &db_instance = loader.GetDatabaseInstance(); + Connection con(db_instance); + con.BeginTransaction(); + + auto &context = *con.context; + auto &catalog = Catalog::GetSystemCatalog(*con.context); + + auto scan_fun = PixelsScanFunction::GetFunctionSet(); + + CreateTableFunctionInfo cinfo(scan_fun); + cinfo.name = "pixels_scan"; + + catalog.CreateTableFunction(context, &cinfo); + con.Commit(); + + auto &config = DBConfig::GetConfig(db_instance); + config.replacement_scans.emplace_back(PixelsScanReplacement); +} + +std::string PixelsExtension::Name() { return "pixels"; } } // namespace duckdb #ifndef DUCKDB_EXTENSION_MAIN #error DUCKDB_EXTENSION_MAIN not defined #endif - diff --git a/cpp/process_sqls.py b/cpp/process_sqls.py new file mode 100755 index 0000000000..8a54da33fc --- /dev/null +++ b/cpp/process_sqls.py @@ -0,0 +1,274 @@ +import os +import re +import subprocess +import csv +import time +import psutil +import json # Added: For parsing benchmark configuration files +from typing import List +import argparse + +# -------------------------- 1. Basic Configuration (Added default benchmark JSON path) -------------------------- +# Default path to benchmark configuration file (can be overridden via CLI parameter) +DEFAULT_BENCHMARK_JSON = "/home/whz/test/pixels/cpp/benchmark.json" + + +# -------------------------- 2. CLI Argument Parsing (Added benchmark-related parameters) -------------------------- +def parse_args(): + parser = argparse.ArgumentParser(description="DuckDB ClickBench Batch Test Script (Multi-column CSV, ensures resource release)") + parser.add_argument( + "--runs", + type=int, + default=3, + help="Number of runs per SQL file (default: 3)" + ) + parser.add_argument( + "--duckdb-bin", + type=str, + default="/home/whz/test/pixels/cpp/build/release/duckdb", + help="Path to duckdb executable" + ) + parser.add_argument( + "--sql-dir", + type=str, + default="/home/whz/test/pixels/cpp/pixels-duckdb/duckdb/benchmark/clickbench/queries", + help="Directory containing SQL files (only processes .sql files starting with 'q')" + ) + parser.add_argument( + "--output-csv", + type=str, + default="/home/whz/test/pixels/cpp/duckdb_benchmark_result.csv", + help="Path to output result CSV" + ) + parser.add_argument( + "--wait-after-run", + type=float, + default=2.0, + help="Seconds to wait after each run (ensures resource release, default: 2s)" + ) + # Added: Specify the name of the benchmark to use (e.g., clickbench-pixels-e0) + parser.add_argument( + "--benchmark", + type=str, + required=True, + help="Name of benchmark to use (must exist in benchmark JSON, e.g. clickbench-pixels-e0)" + ) + # Added: Specify path to benchmark configuration file (uses DEFAULT_BENCHMARK_JSON by default) + parser.add_argument( + "--benchmark-json", + type=str, + default=DEFAULT_BENCHMARK_JSON, + help=f"Path to benchmark configuration JSON file (default: {DEFAULT_BENCHMARK_JSON})" + ) + return parser.parse_args() + + +# -------------------------- 3. Core Utility Functions (Added JSON parsing and view SQL loading) -------------------------- +def get_sql_files(sql_dir: str) -> List[str]: + """Get sorted list of SQL files starting with 'q' in target directory""" + sql_files = [] + for filename in os.listdir(sql_dir): + if filename.endswith(".sql") and filename.startswith("q"): + sql_files.append(os.path.join(sql_dir, filename)) + sql_files.sort() + if not sql_files: + raise ValueError(f"No .sql files starting with 'q' found in {sql_dir}!") + return sql_files + + +def extract_real_time(duckdb_output: str) -> float: + """Extract real execution time (in seconds) from duckdb output""" + pattern = r"Run Time \(s\): real (\d+\.\d+)" + match = re.search(pattern, duckdb_output, re.MULTILINE) + if not match: + raise ValueError(f"Failed to extract real time! Partial output:\n{duckdb_output[:500]}...") + return round(float(match.group(1)), 3) + + +def kill_remaining_duckdb(duckdb_bin: str): + """Check and kill residual duckdb processes to prevent resource leaks""" + duckdb_name = os.path.basename(duckdb_bin) + for proc in psutil.process_iter(['name', 'cmdline']): + try: + # Match processes by name or command line containing duckdb path + if (proc.info['name'] == duckdb_name) or (duckdb_bin in ' '.join(proc.info['cmdline'] or [])): + print(f"⚠️ Found residual {duckdb_name} process (PID: {proc.pid}), killing...") + proc.terminate() + # Force kill if not terminated within 1 second + try: + proc.wait(timeout=1) + except psutil.TimeoutExpired: + proc.kill() + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + +def load_benchmark_create_view(benchmark_json_path: str, benchmark_name: str) -> str: + """Load CREATE VIEW statement for specified benchmark from JSON file""" + # Check if JSON file exists + if not os.path.exists(benchmark_json_path): + raise FileNotFoundError(f"Benchmark JSON file not found: {benchmark_json_path}") + + # Read and parse JSON content + with open(benchmark_json_path, "r", encoding="utf-8") as f: + try: + benchmark_config = json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f"Failed to parse benchmark JSON: {str(e)}") + + # Check if specified benchmark exists in JSON + if benchmark_name not in benchmark_config: + available_benchmarks = ", ".join(benchmark_config.keys()) + raise KeyError(f"Benchmark '{benchmark_name}' not found. Available benchmarks: {available_benchmarks}") + + # Check if CREATE VIEW statement is not empty + create_view_sql = benchmark_config[benchmark_name].strip() + if not create_view_sql: + raise ValueError(f"CREATE VIEW SQL for benchmark '{benchmark_name}' is empty in JSON") + + return create_view_sql + + +def run_single_sql(duckdb_bin: str, create_view_sql: str, sql_content: str, wait_after_run: float) -> float: + """Run SQL once (using dynamically loaded CREATE VIEW statement) and ensure resource release""" + # Assemble complete DuckDB commands (dynamically replace CREATE VIEW statement) + duckdb_commands = f"{create_view_sql}\nset threads=48;\n\n.timer on\n{sql_content.strip()}\n.exit" + process = None # Initialize process variable for exception handling + + try: + process = subprocess.Popen( + [duckdb_bin], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + ) + + # Handle encoding differences between Python 2 and 3 + input_data = duckdb_commands.encode("utf-8") if isinstance(duckdb_commands, str) else duckdb_commands + # Pass input to process and wait for completion (timeout after 1 hour) + stdout, _ = process.communicate(input=input_data, timeout=360) + + # Decode output (Python 3 returns bytes, Python 2 returns string) + output = stdout.decode("utf-8", errors="ignore") if isinstance(stdout, bytes) else stdout + + # Check if process exited successfully + if process.returncode != 0: + raise RuntimeError( + f"duckdb execution failed (code {process.returncode}):\n{output[:1000]}..." + ) + + # Extract execution time and ensure resource release + real_time = extract_real_time(output) + time.sleep(wait_after_run) + kill_remaining_duckdb(duckdb_bin) + + return real_time + + except subprocess.TimeoutExpired: + if process: + process.kill() + raise RuntimeError("duckdb execution timed out (exceeded 1 hour)") from None + finally: + # Ensure process is terminated if still running + if process and process.poll() is None: + process.kill() + print("⚠️ Forcibly terminated non-exiting duckdb process") + + +def init_csv(output_csv: str, runs: int): + """Initialize CSV file with result headers""" + headers = ["SQL File Name"] + [f"Run {idx} Time (s)" for idx in range(1, runs + 1)] + with open(output_csv, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=headers) + writer.writeheader() + print(f"✅ Initialized multi-column CSV with headers: {','.join(headers)}") + + +def write_single_row(output_csv: str, sql_filename: str, run_times: List[float], runs: int): + """Write test results of a single SQL file to CSV""" + row_data = {"SQL File Name": sql_filename} + for idx in range(1, runs + 1): + # Fill with empty string if no result for current run + time_val = run_times[idx - 1] if (idx - 1) < len(run_times) else "" + row_data[f"Run {idx} Time (s)"] = time_val + with open(output_csv, "a", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=row_data.keys()) + writer.writerow(row_data) + + +# -------------------------- 4. Main Logic (Integrated Benchmark Configuration Loading) -------------------------- +def main(): + args = parse_args() + print("=" * 70) + print("DuckDB ClickBench Batch Test Script (Resource Release Ensured)") + print(f"Config: {args.runs} runs per SQL, {args.wait_after_run}s wait after each run") + print(f"Benchmark: {args.benchmark} (from {args.benchmark_json})") + print(f"DuckDB path: {args.duckdb_bin}") + print(f"SQL directory: {args.sql_dir}") + print(f"Output CSV: {args.output_csv}") + print("=" * 70) + + # Step 1: Initialization - Clean residual processes + Load benchmark config + kill_remaining_duckdb(args.duckdb_bin) + try: + create_view_sql = load_benchmark_create_view(args.benchmark_json, args.benchmark) + print(f"✅ Loaded CREATE VIEW SQL for benchmark '{args.benchmark}'") + except (FileNotFoundError, KeyError, ValueError) as e: + print(f"\n❌ Benchmark initialization failed: {str(e)}") + return + + # Step 2: Initialize CSV + Get list of SQL files + init_csv(args.output_csv, args.runs) + try: + sql_files = get_sql_files(args.sql_dir) + print(f"\n✅ Found {len(sql_files)} eligible SQL files:") + for i, f in enumerate(sql_files, 1): + print(f" {i:2d}. {os.path.basename(f)}") + except ValueError as e: + print(f"\n❌ Error: {e}") + return + + # Step 3: Process each SQL file (using dynamically loaded CREATE VIEW statement) + for sql_file in sql_files: + sql_filename = os.path.basename(sql_file).replace(".sql", "") + print(f"\n{'=' * 60}") + print(f"Processing: {sql_filename}.sql") + print(f"{'=' * 60}") + + # Read content of current SQL file + try: + with open(sql_file, "r", encoding="utf-8") as f: + sql_content = f.read() + print(f"✅ Successfully read SQL file (content length: {len(sql_content)} chars)") + except Exception as e: + print(f"❌ Failed to read SQL file: {e}") + write_single_row(args.output_csv, sql_filename, [], args.runs) + continue + + # Run multiple times and record results + run_times = [] + for run_idx in range(1, args.runs + 1): + print(f"\n--- Run {run_idx:2d}/{args.runs} ---") + try: + # Pass dynamically loaded create_view_sql to execution function + real_time = run_single_sql(args.duckdb_bin, create_view_sql, sql_content, args.wait_after_run) + run_times.append(real_time) + print(f"✅ Run successful, time: {real_time}s") + except (RuntimeError, ValueError) as e: + print(f"❌ Run failed: {e}") + continue + + # Write results to CSV + write_single_row(args.output_csv, sql_filename, run_times, args.runs) + print(f"\n✅ Written to CSV: {sql_filename}.sql → Valid runs: {len(run_times)}/{args.runs}") + + # Final cleanup of residual processes after all tests + kill_remaining_duckdb(args.duckdb_bin) + print(f"\n{'=' * 70}") + print("All SQL files processed!") + print(f"Multi-column CSV: {args.output_csv}") + print("=" * 70) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/cpp/testcase/README-zh.md b/cpp/testcase/README-zh.md new file mode 100644 index 0000000000..0d5942b20c --- /dev/null +++ b/cpp/testcase/README-zh.md @@ -0,0 +1,39 @@ +# 测试 +本目录存放了所有测试 + +## 运行脚本 +`process_sqls.py` 运行查询,需要传入benchmark参数指定要运行的benchmark,也需要指定要运行的查询 +```bash +usage: process_sqls.py [-h] [--runs RUNS] [--duckdb-bin DUCKDB_BIN] [--sql-dir SQL_DIR] + [--output-csv OUTPUT_CSV] [--wait-after-run WAIT_AFTER_RUN] + [--threads THREADS] [--benchmark BENCHMARK] [--benchmark-json BENCHMARK_JSON] + +DuckDB ClickBench Batch Test Script (Multi-column CSV, ensures resource release) + +options: + -h, --help show this help message and exit + --runs RUNS Number of runs per SQL file (default: 3) + --duckdb-bin DUCKDB_BIN + Path to duckdb executable + --sql-dir SQL_DIR Directory containing SQL files (only processes .sql files starting with 'q') + --output-csv OUTPUT_CSV + Path to output result CSV + --wait-after-run WAIT_AFTER_RUN + Seconds to wait after each run (ensures resource release, default: 2s) + --threads THREADS Number of threads to use in DuckDB (default: 96) + --benchmark BENCHMARK + Name of benchmark to use (must exist in benchmark JSON, e.g. clickbench- + pixels-e0) + --benchmark-json BENCHMARK_JSON + Path to benchmark configuration JSON file (default: ./benchmark.json) + +``` + +## I/O粒度测试 +`blk_stat.py`在执行`process_sqls.py`的同时,调用blktrace和blkprase读取底层块设备的I/O粒度,同时也需要注意运行的查询由`process_sql.py`内置 + +## 单/双buffer性能测试 +`single_doublebuffer_async_sync_test.py` 设置运行参数,执行单双buffer测试 + +## perf实验 + diff --git a/cpp/testcase/benchmark.json b/cpp/testcase/benchmark.json new file mode 100644 index 0000000000..092f183201 --- /dev/null +++ b/cpp/testcase/benchmark.json @@ -0,0 +1,15 @@ +{ + "tpch-pixels-e0":"", + "tpch-pixels-e1":"", + "tpch-pixels-e2":"", + "tpch-parquet-e0":"", + "tpch-parquet-e2":"", + "clickbench-parquet-e2":"", + "clickbench-parquet-e0":"CREATE VIEW hits AS SELECT * FROM parquet_scan([\n \"/data/9a3-01/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-02/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-03/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-04/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-05/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-06/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-07/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-08/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-09/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-10/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-11/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-12/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-13/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-14/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-15/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-16/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-17/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-18/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-19/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-20/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-21/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-22/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-23/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-24/clickbench/parquet-e0/hits/*\"\n ]\n);", + "clickbench-pixels-e2":"", + "clickbench-pixels-e0-24ssd":"CREATE VIEW hits AS SELECT * FROM pixels_scan([\n \"/data/9a3-01/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-02/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-03/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-04/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-05/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-06/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-07/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-08/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-09/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-10/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-11/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-12/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-13/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-14/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-15/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-16/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-17/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-18/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-19/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-20/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-21/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-22/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-23/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-24/clickbench/pixels-e0/hits/v-0-ordered/*\"\n ]\n);", + "clickbench-pixels-e1":"", + "clickbench-pixels-e0-1ssd": "CREATE VIEW hits AS SELECT * FROM pixels_scan([\"/data/9a3-01/clickbench/pixels-e0/hits/v-0-ordered/*\"]);\n", + "clickbench-pixels-e0-6ssd": "CREATE VIEW hits AS SELECT * FROM pixels_scan([\n \"/data/9a3-01/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-02/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-03/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-04/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-05/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-06/clickbench/pixels-e0/hits/v-0-ordered/*\"\n ]\n);", + "clickbench-pixels-e0-12ssd": "CREATE VIEW hits AS SELECT * FROM pixels_scan([\n \"/data/9a3-01/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-02/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-03/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-04/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-05/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-06/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-07/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-08/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-09/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-10/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-11/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-12/clickbench/pixels-e0/hits/v-0-ordered/*\"\n ]\n);" +} \ No newline at end of file diff --git a/cpp/testcase/blk_stat.py b/cpp/testcase/blk_stat.py new file mode 100644 index 0000000000..cc53621d17 --- /dev/null +++ b/cpp/testcase/blk_stat.py @@ -0,0 +1,96 @@ +import subprocess +import time +import re +import csv +import argparse +from collections import Counter +import os # <-- 导入 os 模块 + + +def clear_page_cache(): + """Clear Linux page cache to ensure fair benchmarking""" + try: + print("🧹 Clearing Linux page cache...") + # Synchronize filesystem caches + subprocess.run(["sync"], check=True) + # Drop caches (3 clears pagecache, dentries, and inodes) + subprocess.run(["sudo", "bash", "-c", "echo 3 > /proc/sys/vm/drop_caches"], check=True) + print("✅ Page cache cleared successfully") + except subprocess.CalledProcessError as e: + print(f"⚠️ Failed to clear page cache: {e}") + + +# -------------------- 1️⃣ Parse Command Line Arguments -------------------- +parser = argparse.ArgumentParser(description="Monitor I/O granularity using blktrace and blkparse") +parser.add_argument("--benchmark", required=True, help="Benchmark name, used as output file prefix") +args = parser.parse_args() +benchmark_name = args.benchmark + +# -------------------- 2️⃣ Define Regex Pattern -------------------- +# Pattern for capturing I/O size (in sectors) and the process name +# The current pattern targets 'G' (Get request) operations. +pattern = re.compile(r"\sG\s+RA?\s+\d+\s+\+\s+(\d+)\s+\[(duckdb|iou-sqp-\d+)\]") + +# -------------------- 3️⃣ Start blktrace and blkparse Pipeline -------------------- +# blktrace monitors block device I/O on nvme0n1 and outputs raw data to stdout +blktrace_cmd = ["sudo", "blktrace", "-d", "/dev/nvme0n1","-o", "-"] +# blkparse reads raw data from stdin ('-') +blkparse_cmd = ["blkparse", "-i", "-"] + +p1 = subprocess.Popen(blktrace_cmd, stdout=subprocess.PIPE) +p2 = subprocess.Popen(blkparse_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, text=True) + +# -------------------- 4️⃣ Clear Page Cache -------------------- +clear_page_cache() + +# -------------------- 5️⃣ Start Benchmark Script (process_sqls.py) -------------------- +proc = subprocess.Popen(["python3", "process_sqls.py", "--runs", "1", "--benchmark", benchmark_name]) + +# -------------------- 6️⃣ Real-time I/O Granularity Collection -------------------- +counter = Counter() +print(f"📊 Collecting I/O traces while benchmark '{benchmark_name}' is running...") + +try: + # Read blkparse output line by line + for line in p2.stdout: + # Search for I/O size and process name using the defined pattern + match = pattern.search(line) + + if match: + # Group 1 is the I/O size in sectors + size = int(match.group(1)) + counter[size] += 1 + + # Check if the benchmark process (process_sqls) has finished + if proc.poll() is not None: + break +except KeyboardInterrupt: + print("⏹️ Stopped manually") + +# -------------------- 7️⃣ Terminate blktrace/blkparse -------------------- +p1.terminate() +p2.terminate() + +# -------------------- 8️⃣ Create Output Directory and Save Results -------------------- +output_dir = "io_results" +output_filename = os.path.join(output_dir, f"io_granularity_stats-{benchmark_name}.csv") # 使用 os.path.join 组合路径 + +# --- 检查并创建目录 --- +if not os.path.exists(output_dir): + print(f"📁 Output directory '{output_dir}' not found. Creating it...") + # recursively create directories if they don't exist + os.makedirs(output_dir) +# ---------------------- + +with open(output_filename, "w", newline="") as f: + writer = csv.writer(f) + # Write header: IO size in sectors, count of requests, and IO size converted to bytes (512 bytes/sector) + writer.writerow(["IO_Size_Sectors", "Count", "IO_Size_Bytes"]) + # Write sorted results + for s, c in sorted(counter.items()): + writer.writerow([s, c, s * 512]) + +print(f"✅ Results saved to {output_filename}") + + + diff --git a/cpp/testcase/generate_flamegraphs.sh b/cpp/testcase/generate_flamegraphs.sh new file mode 100755 index 0000000000..4676d81e4f --- /dev/null +++ b/cpp/testcase/generate_flamegraphs.sh @@ -0,0 +1,135 @@ +#!/bin/bash + +# ==================================================== +# Script Function: Batch generation of CPU/I/O/Scheduling related Flame Graphs (Perf + FlameGraph) +# Analysis Events: 1. cpu-clock, 2. page-faults, 3. branch-misses, 4. sched_switch/sched_stat_wait +# Distinction: Each event uses a different color palette (hot, mem, perf, chain) +# Usage: $0 +# Example: $0 test_q01.sql ../../build/release/duckdb ./results +# ==================================================== + +# --- Configure FlameGraph Path --- +FLAMEGRAPH_DIR="$HOME/FlameGraph" +STACKCOLLAPSE="${FLAMEGRAPH_DIR}/stackcollapse-perf.pl" +FLAMEGRAPH_PL="${FLAMEGRAPH_DIR}/flamegraph.pl" + +# ---------------------------------------------------- +# 1. Check Environment and Arguments +# ---------------------------------------------------- + +# Check FlameGraph dependencies +if [ ! -x "$STACKCOLLAPSE" ] || [ ! -x "$FLAMEGRAPH_PL" ]; then + echo "Error: Cannot find or execute FlameGraph tools (stackcollapse-perf.pl or flamegraph.pl)." + echo "Please ensure the FlameGraph repository is cloned to $HOME/FlameGraph." + exit 1 +fi + +# Check argument count +if [ "$#" -ne 3 ]; then + echo "Usage: $0 " + echo "Example: $0 test_q01.sql ../build/release/duckdb ./results" + exit 1 +fi + +# Receive arguments +SQL_FILE=$1 +DUCKDB_BINARY=$2 +OUTPUT_DIR=$3 + +# Check if DuckDB executable exists +if [ ! -x "$DUCKDB_BINARY" ]; then + echo "Error: Cannot find or execute ${DUCKDB_BINARY}" + exit 1 +fi + +# Create output directory (if it doesn't exist) +mkdir -p "$OUTPUT_DIR" +if [ ! -d "$OUTPUT_DIR" ]; then + echo "Error: Cannot create output directory ${OUTPUT_DIR}" + exit 1 +fi + +# Extract filename as prefix (e.g., test_q01) +FILENAME_PREFIX=$(basename "$SQL_FILE" .sql) + +echo "--- Starting Query Analysis: ${FILENAME_PREFIX} ---" +echo "Using executable: ${DUCKDB_BINARY}" +echo "Output directory: ${OUTPUT_DIR}" + +# ---------------------------------------------------- +# 2. Core Function Definition +# ---------------------------------------------------- + +# Function: Generate Flame Graph for specified event +# $1: Event Name (perf event name, e.g., cpu-clock, page-faults) +# $2: Friendly Event Name (e.g., CPU Time) +# $3: Output File Suffix (e.g., cpu_time, page_faults) +# $4: Color Palette Name (e.g., hot, mem, perf, chain) +function generate_flamegraph { + local EVENT_NAME="$1" + local FRIENDLY_NAME="$2" + local SUFFIX="$3" + local COLOR_PALETTE="$4" + + echo "" + echo "----------------------------------------------------" + echo "Start recording event: ${FRIENDLY_NAME} (${EVENT_NAME}) - Color: ${COLOR_PALETTE}" + echo "----------------------------------------------------" + + local DATA_FILE="${OUTPUT_DIR}/${FILENAME_PREFIX}_${SUFFIX}.data" + local PERF_TXT="${OUTPUT_DIR}/${FILENAME_PREFIX}_${SUFFIX}.perf.txt" + local FOLDED_FILE="${OUTPUT_DIR}/${FILENAME_PREFIX}_${SUFFIX}.folded" + local SVG_FILE="${OUTPUT_DIR}/${FILENAME_PREFIX}_${SUFFIX}.svg" + + # --- Record Data --- + # -e specifies event, --call-graph=dwarf captures call stack, -g enables call graph + sudo -E perf record --call-graph=dwarf -e "$EVENT_NAME" -g -o "$DATA_FILE" -F 10\ + -- "$DUCKDB_BINARY" < "$SQL_FILE" + + if [ $? -ne 0 ]; then + echo "[ERROR] perf record failed. Check permissions or perf installation." + return 1 + fi + + # --- Convert Data and Generate Flame Graph --- + echo "Converting ${FRIENDLY_NAME} data and generating Flame Graph..." + sudo perf script -i "$DATA_FILE" > "$PERF_TXT" + "$STACKCOLLAPSE" "$PERF_TXT" > "$FOLDED_FILE" + + # Add --color argument to specify color palette + "$FLAMEGRAPH_PL" --title="${FILENAME_PREFIX} ${FRIENDLY_NAME} Hotspots" --countname="$FRIENDLY_NAME" \ + --color="$COLOR_PALETTE" \ + "$FOLDED_FILE" > "$SVG_FILE" + echo "✅ ${FRIENDLY_NAME} Flame Graph generated: ${SVG_FILE}" + + # --- Cleanup Intermediate Files --- + echo "Cleaning up ${FRIENDLY_NAME} intermediate files..." + rm -f "$PERF_TXT" "$FOLDED_FILE" + sudo rm -f "$DATA_FILE" +} + +# ---------------------------------------------------- +# 3. Run Analysis (4 Flame Graphs total) +# ---------------------------------------------------- + +# 1. CPU Time Analysis (Standard CPU bottleneck analysis) +generate_flamegraph "cpu-clock" "CPU Time" "cpu_time" "hot" + +# 2. I/O Bottleneck Analysis (Related to memory access) +generate_flamegraph "page-faults" "Page Faults" "page_faults" "mem" + +# 3. Computational Efficiency Analysis (Related to pipeline) +generate_flamegraph "branch-misses" "Branch Misses" "branch_misses" "hot" + +# 4. Scheduling/Wait Bottleneck Analysis (Related to lock contention, context switching) +# Note: This event requires two perf event names +generate_flamegraph "sched:sched_switch,sched:sched_stat_wait" "Thread Scheduling" "sched" "chain" + + +# ---------------------------------------------------- +# 4. Task Summary +# ---------------------------------------------------- +echo "" +echo "--- Task Complete ---" +echo "Final result files (SVG/HTML) are in the ${OUTPUT_DIR} directory:" +find "$OUTPUT_DIR" -name "${FILENAME_PREFIX}_*.svg" \ No newline at end of file diff --git a/cpp/testcase/process_sqls.py b/cpp/testcase/process_sqls.py new file mode 100755 index 0000000000..38ce63b9e6 --- /dev/null +++ b/cpp/testcase/process_sqls.py @@ -0,0 +1,261 @@ +import os +import re +import subprocess +import csv +import time +import psutil +import json # Added: For parsing benchmark configuration files +from typing import List +import argparse + +# -------------------------- 1. Basic Configuration (Added default benchmark JSON path) -------------------------- +# Default path to benchmark configuration file (can be overridden via CLI parameter) +DEFAULT_BENCHMARK_JSON = "./benchmark.json" + + +def clear_page_cache(): + """Clear Linux page cache to ensure fair benchmarking""" + try: + print("🧹 Clearing Linux page cache...") + subprocess.run(["sync"], check=True) + subprocess.run(["sudo", "bash", "-c", "echo 3 > /proc/sys/vm/drop_caches"], check=True) + print("✅ Page cache cleared successfully") + except subprocess.CalledProcessError as e: + print(f"⚠️ Failed to clear page cache: {e}") + + +# -------------------------- 2. CLI Argument Parsing (Added benchmark-related parameters) -------------------------- +def parse_args(): + parser = argparse.ArgumentParser(description="DuckDB ClickBench Batch Test Script (Multi-column CSV, ensures resource release)") + parser.add_argument( + "--runs", + type=int, + default=3, + help="Number of runs per SQL file (default: 3)" + ) + parser.add_argument( + "--duckdb-bin", + type=str, + default="/home/whz/test/pixels/cpp/build/release/duckdb", + help="Path to duckdb executable" + ) + parser.add_argument( + "--sql-dir", + type=str, + default="/home/whz/test/pixels/cpp/pixels-duckdb/duckdb/benchmark/clickbench/queries-test", + help="Directory containing SQL files (only processes .sql files starting with 'q')" + ) + parser.add_argument( + "--output-csv", + type=str, + default="/home/whz/test/pixels/cpp/duckdb_benchmark_result.csv", + help="Path to output result CSV" + ) + parser.add_argument( + "--wait-after-run", + type=float, + default=2.0, + help="Seconds to wait after each run (ensures resource release, default: 2s)" + ) + parser.add_argument( + "--threads", + type=int, + default=96, + help="Number of threads to use in DuckDB (default: 96)" + ) + parser.add_argument( + "--benchmark", + type=str, + default="clickbench-pixels-e0-1ssd", + help="Name of benchmark to use (must exist in benchmark JSON, e.g. clickbench-pixels-e0)" + ) + parser.add_argument( + "--benchmark-json", + type=str, + default=DEFAULT_BENCHMARK_JSON, + help=f"Path to benchmark configuration JSON file (default: {DEFAULT_BENCHMARK_JSON})" + ) + return parser.parse_args() + + +# -------------------------- 3. Core Utility Functions -------------------------- +def get_sql_files(sql_dir: str) -> List[str]: + sql_files = [] + for filename in os.listdir(sql_dir): + if filename.endswith(".sql") and filename.startswith("q"): + sql_files.append(os.path.join(sql_dir, filename)) + sql_files.sort() + if not sql_files: + raise ValueError(f"No .sql files starting with 'q' found in {sql_dir}!") + return sql_files + + +def extract_real_time(duckdb_output: str) -> float: + pattern = r"Run Time \(s\): real (\d+\.\d+)" + match = re.search(pattern, duckdb_output, re.MULTILINE) + if not match: + raise ValueError(f"Failed to extract real time! Partial output:\n{duckdb_output[:500]}...") + return round(float(match.group(1)), 3) + + +def kill_remaining_duckdb(duckdb_bin: str): + duckdb_name = os.path.basename(duckdb_bin) + for proc in psutil.process_iter(['name', 'cmdline']): + try: + if (proc.info['name'] == duckdb_name) or (duckdb_bin in ' '.join(proc.info['cmdline'] or [])): + print(f"⚠️ Found residual {duckdb_name} process (PID: {proc.pid}), killing...") + proc.terminate() + try: + proc.wait(timeout=1) + except psutil.TimeoutExpired: + proc.kill() + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + +def load_benchmark_create_view(benchmark_json_path: str, benchmark_name: str) -> str: + if not os.path.exists(benchmark_json_path): + raise FileNotFoundError(f"Benchmark JSON file not found: {benchmark_json_path}") + + with open(benchmark_json_path, "r", encoding="utf-8") as f: + try: + benchmark_config = json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f"Failed to parse benchmark JSON: {str(e)}") + + if benchmark_name not in benchmark_config: + available_benchmarks = ", ".join(benchmark_config.keys()) + raise KeyError(f"Benchmark '{benchmark_name}' not found. Available benchmarks: {available_benchmarks}") + + create_view_sql = benchmark_config[benchmark_name].strip() + if not create_view_sql: + raise ValueError(f"CREATE VIEW SQL for benchmark '{benchmark_name}' is empty in JSON") + + return create_view_sql + + +def run_single_sql(duckdb_bin: str, create_view_sql: str, sql_content: str, wait_after_run: float, threads: int) -> float: + duckdb_commands = f"{create_view_sql}\nset threads={threads};\n\n.timer on\n{sql_content.strip()}\n.exit" + process = None + + try: + process = subprocess.Popen( + [duckdb_bin], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + ) + + input_data = duckdb_commands.encode("utf-8") + stdout, _ = process.communicate(input=input_data, timeout=3600) + + output = stdout.decode("utf-8", errors="ignore") + + if process.returncode != 0: + raise RuntimeError(f"duckdb execution failed (code {process.returncode}):\n{output[:1000]}...") + + real_time = extract_real_time(output) + time.sleep(wait_after_run) + kill_remaining_duckdb(duckdb_bin) + return real_time + + except subprocess.TimeoutExpired: + if process: + process.kill() + raise RuntimeError("duckdb execution timed out (exceeded 1 hour)") from None + finally: + if process and process.poll() is None: + process.kill() + print("⚠️ Forcibly terminated non-exiting duckdb process") + + +def init_csv(output_csv: str, runs: int): + headers = ["SQL File Name"] + [f"Run {idx} Time (s)" for idx in range(1, runs + 1)] + with open(output_csv, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=headers) + writer.writeheader() + print(f"✅ Initialized multi-column CSV with headers: {','.join(headers)}") + + +def write_single_row(output_csv: str, sql_filename: str, run_times: List[float], runs: int): + row_data = {"SQL File Name": sql_filename} + for idx in range(1, runs + 1): + time_val = run_times[idx - 1] if (idx - 1) < len(run_times) else "" + row_data[f"Run {idx} Time (s)"] = time_val + with open(output_csv, "a", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=row_data.keys()) + writer.writerow(row_data) + + +# -------------------------- 4. Main Logic -------------------------- +def main(): + args = parse_args() + print("=" * 70) + print("DuckDB ClickBench Batch Test Script (Resource Release Ensured)") + print(f"Config: {args.runs} runs per SQL, {args.wait_after_run}s wait after each run") + print(f"Benchmark: {args.benchmark} (from {args.benchmark_json})") + print(f"DuckDB path: {args.duckdb_bin}") + print(f"Threads: {args.threads}") + print(f"SQL directory: {args.sql_dir}") + print(f"Output CSV: {args.output_csv}") + print("=" * 70) + + # clear_page_cache() + + kill_remaining_duckdb(args.duckdb_bin) + try: + create_view_sql = load_benchmark_create_view(args.benchmark_json, args.benchmark) + print(f"✅ Loaded CREATE VIEW SQL for benchmark '{args.benchmark}'") + except (FileNotFoundError, KeyError, ValueError) as e: + print(f"\n❌ Benchmark initialization failed: {str(e)}") + return + + init_csv(args.output_csv, args.runs) + try: + sql_files = get_sql_files(args.sql_dir) + print(f"\n✅ Found {len(sql_files)} eligible SQL files:") + for i, f in enumerate(sql_files, 1): + print(f" {i:2d}. {os.path.basename(f)}") + except ValueError as e: + print(f"\n❌ Error: {e}") + return + + for sql_file in sql_files: + sql_filename = os.path.basename(sql_file).replace(".sql", "") + print(f"\n{'=' * 60}") + print(f"Processing: {sql_filename}.sql") + print(f"{'=' * 60}") + + try: + with open(sql_file, "r", encoding="utf-8") as f: + sql_content = f.read() + print(f"✅ Successfully read SQL file (content length: {len(sql_content)} chars)") + except Exception as e: + print(f"❌ Failed to read SQL file: {e}") + write_single_row(args.output_csv, sql_filename, [], args.runs) + continue + + run_times = [] + for run_idx in range(1, args.runs + 1): + print(f"\n--- Run {run_idx:2d}/{args.runs} ---") + clear_page_cache() + try: + real_time = run_single_sql(args.duckdb_bin, create_view_sql, sql_content, args.wait_after_run, args.threads) + run_times.append(real_time) + print(f"✅ Run successful, time: {real_time}s") + except (RuntimeError, ValueError) as e: + print(f"❌ Run failed: {e}") + continue + + write_single_row(args.output_csv, sql_filename, run_times, args.runs) + print(f"\n✅ Written to CSV: {sql_filename}.sql → Valid runs: {len(run_times)}/{args.runs}") + + kill_remaining_duckdb(args.duckdb_bin) + print(f"\n{'=' * 70}") + print("All SQL files processed!") + print(f"Multi-column CSV: {args.output_csv}") + print("=" * 70) + + +if __name__ == "__main__": + main() diff --git a/cpp/testcase/run_perf.py b/cpp/testcase/run_perf.py new file mode 100644 index 0000000000..38503691e1 --- /dev/null +++ b/cpp/testcase/run_perf.py @@ -0,0 +1,192 @@ +import subprocess +import os +import re +import argparse # Import argparse for command-line arguments + +# --- Configuration (Defaults & Constants) --- +# Default lists (can be overridden by command-line arguments) +# THREADS = [1, 2, 4, 8, 16, 24, 32, 48, 64, 96] +# THREADS=[24] +# QUERIES = ["q01", "q24", "q33"] +# SSD_MODES = ["1ssd", "24ssd"] +# SSD_MODES=["1ssd"] +# THREADS = [32, 48] +# # THREADS = [24] +# # QUERIES = ["q01"] +# QUERIES = ["q01"] +# SSD_MODES = ["1ssd"] +# THREADS = [ 96] +# THREADS=[24] +# QUERIES = ["q24"] +# SSD_MODES = ["24ssd"] +THREADS = [24] +QUERIES = ["q01", "q24", "q33"] +SSD_MODES = ["1ssd"] + +# Base 'perf stat' command: focusing on CPU and scheduling metrics +PERF_CMD_BASE = [ + "sudo", "-E", "perf", "stat", + "-e", "cycles,instructions,cache-references,cache-misses,branches,branch-misses", + "-e", "page-faults,minor-faults,major-faults", + "-e", "task-clock,context-switches" +] + +# Path to the DuckDB binary +DUCKDB_BINARY = "../build/release/duckdb" + +# FlameGraph Bash script path (updated for scheduling events) +FLAMEGRAPH_SCRIPT = "./generate_flamegraphs.sh" + + +def ensure_result_dir(result_dir): + """Create the results output directory""" + if not os.path.exists(result_dir): + os.makedirs(result_dir) + print(f"Created directory: {result_dir}") + + +def update_sql_thread(sql_dir, sql_filename, thread_value, result_dir): + """ + Replaces 'set threads=x;' in the SQL file with the specified value + and writes the content to a temporary file in the result directory. + """ + # Construct the full path to the base SQL file + sql_path = os.path.join(sql_dir, sql_filename) + + with open(sql_path, "r") as f: + content = f.read() + + # Replace or add set threads=x; + # Try to replace existing one, if not found, assume it goes at the start + if re.search(r"set\s+threads\s*=\s*\d+;", content): + new_content = re.sub(r"set\s+threads\s*=\s*\d+;", f"set threads={thread_value};", content) + else: + # If no 'set threads' line is found, add it to the beginning + new_content = f"set threads={thread_value};\n{content}" + + + # Temporary file name uses the result directory + tmp_path = os.path.join(result_dir, f"{os.path.basename(sql_filename)}.tmp_threads_{thread_value}.sql") + with open(tmp_path, "w") as f: + f.write(new_content) + + return tmp_path + + +def run_perf_stat_switches(query_file, query_name, ssd_mode, thread_value, result_dir): + """ + Executes perf stat, collects context switch metrics, and outputs the result + to a file in the results directory. + """ + output_name = f"{query_name}-{ssd_mode}-threads{thread_value}-context-stat.txt" + output_path = os.path.join(result_dir, output_name) + + # PERF_CMD_BASE is defined at the top of the script + cmd = PERF_CMD_BASE + ["-o", output_path, DUCKDB_BINARY] + + print(f"\n--- 1. Running perf stat for context switches: {output_name} ---") + + try: + with open(query_file, "r") as sql_f: + subprocess.run(cmd, stdin=sql_f, check=True) # Use check=True to ensure command execution success + print(f"==> perf stat output saved to: {output_path}") + except subprocess.CalledProcessError as e: + print(f"[ERROR] perf stat failed for {output_name}: {e}") + return False + except FileNotFoundError: + print(f"[ERROR] DuckDB binary not found at {DUCKDB_BINARY}") + return False + + return True + +def run_sched_flamegraph(tmp_sql_path, query_name, ssd_mode, thread_value, result_dir): + """ + Calls an external Bash script to generate a flame graph focused on scheduling events. + """ + if not os.path.exists(FLAMEGRAPH_SCRIPT): + print(f"[WARN] Schedule FlameGraph script not found: {FLAMEGRAPH_SCRIPT}. Skipping analysis.") + return + + print(f"\n--- 2. Running Schedule FlameGraph analysis ---") + + # Ensure the Bash script has execute permissions + if not os.access(FLAMEGRAPH_SCRIPT, os.X_OK): + print(f"[WARN] Adding execute permission to {FLAMEGRAPH_SCRIPT}") + os.chmod(FLAMEGRAPH_SCRIPT, 0o755) + + output_html_name = f"{query_name}-{ssd_mode}-threads{thread_value}-sched.svg" + output_html_path = os.path.join(result_dir, output_html_name) + + # Call the Bash script, passing the temporary SQL file, DuckDB binary path, and output HTML path + flamegraph_cmd = [FLAMEGRAPH_SCRIPT, tmp_sql_path, DUCKDB_BINARY, output_html_path] + + try: + subprocess.run(flamegraph_cmd, check=True) + print(f"==> Schedule FlameGraph saved to: {output_html_path}") + except subprocess.CalledProcessError as e: + print(f"[ERROR] Schedule FlameGraph script failed for {tmp_sql_path}: {e}") + except FileNotFoundError: + print(f"[ERROR] FlameGraph script file not found at {FLAMEGRAPH_SCRIPT}") + + +def main(): + parser = argparse.ArgumentParser(description="Run DuckDB benchmarks with perf stat and FlameGraph analysis.") + + # --- New required arguments for directories --- + parser.add_argument("--sql-dir", required=False, default="perf-pixels" ,help="Directory containing the base SQL query files (e.g., test-q01-1ssd.sql).") + parser.add_argument("--result-dir", required=False,default="perf-pixels", help="Directory where temporary files and final results (perf stats, SVG) will be saved.") + THREADS = [24] + QUERIES = ["q01", "q24", "q33"] + SSD_MODES = ["1ssd"] + # --- Optional arguments with defaults --- + parser.add_argument("--threads", type=int, nargs='+', default=THREADS, help=f"List of thread counts to test (default: {THREADS}).") + parser.add_argument("--queries", nargs='+', default=QUERIES, help=f"List of query IDs to test (default: {QUERIES}).") + parser.add_argument("--ssd-modes", nargs='+', default=SSD_MODES, help=f"List of SSD configurations (default: {SSD_MODES}).") + + args = parser.parse_args() + + # Assign parsed arguments to variables + RESULT_DIR = args.result_dir + SQL_DIR = args.sql_dir + THREADS = args.threads + QUERIES = args.queries + SSD_MODES = args.ssd_modes + + # Ensure the result directory exists + ensure_result_dir(RESULT_DIR) + + for q in QUERIES: + for mode in SSD_MODES: + + sql_file_base = f"test-{q}-{mode}.sql" + # Check for the file in the specified SQL_DIR + sql_file_path_check = os.path.join(SQL_DIR, sql_file_base) + + if not os.path.exists(sql_file_path_check): + print(f"[WARN] Base SQL file not found: {sql_file_path_check}, skipping") + continue + + for t in THREADS: + print("=" * 50) + print(f"Starting analysis for Q={q}, Mode={mode}, Threads={t}") + + # 1. Update threads and create temporary SQL file in RESULT_DIR + tmp_sql = update_sql_thread( + SQL_DIR, sql_file_base, t, RESULT_DIR + ) + + # 2. Run perf stat to collect context switch metrics + success = run_perf_stat_switches(tmp_sql, q, mode, t, RESULT_DIR) + + # 3. If perf stat succeeded, run scheduling FlameGraph analysis + if success: + run_sched_flamegraph(tmp_sql, q, mode, t, RESULT_DIR) + + # 4. Clean up temporary SQL file + os.remove(tmp_sql) + print(f"Cleaned up temporary SQL file: {tmp_sql}") + print("=" * 50) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/cpp/testcase/single_doublebuffer_async_sync_test.py b/cpp/testcase/single_doublebuffer_async_sync_test.py new file mode 100644 index 0000000000..bbf1e575f6 --- /dev/null +++ b/cpp/testcase/single_doublebuffer_async_sync_test.py @@ -0,0 +1,139 @@ +import subprocess +import os +import re +import shutil + +# ------------------------------------- +# 1. Configuration Parameters +# ------------------------------------- +# threads_list = [1, 2, 4, 8, 16, 24, 48, 64, 96] +threads_list = [16, 24] +benchmarks = [ + "clickbench-pixels-e0-1ssd", + # "clickbench-pixels-e0-6ssd", + # "clickbench-pixels-e0-12ssd", + # "clickbench-pixels-e0-24ssd" +] + +runs = 1 +# Define all Buffer Modes to be tested +buffer_modes = ["doublebuffer", "singlebuffer"] +properties_path = os.path.expanduser("~/opt/pixels/etc/pixels-cpp.properties") +process_script = "process_sqls.py" + +# Root directory for saving results +output_root = "single_double_buffer_results" + +# ------------------------------------- +# 2. Core Modification: Buffer Mode Switching Function +# ------------------------------------- +def set_buffer_mode(mode): + """Modify the 'pixels.doublebuffer' parameter in pixels-cpp.properties""" + assert mode in ("doublebuffer", "singlebuffer") + + if not os.path.exists(properties_path): + raise FileNotFoundError(f"Configuration file not found: {properties_path}") + + with open(properties_path, "r") as f: + lines = f.readlines() + + new_lines = [] + changed = False + + # Determine the value to set + new_value = "true" if mode == "doublebuffer" else "false" + + for line in lines: + if line.strip().startswith("pixels.doublebuffer"): + # Find and replace this line + new_lines.append(f"pixels.doublebuffer={new_value}\n") + changed = True + else: + new_lines.append(line) + + # If the line was not found in the file, append it at the end + if not changed: + new_lines.append(f"pixels.doublebuffer={new_value}\n") + + with open(properties_path, "w") as f: + f.writelines(new_lines) + + print(f"🔄 Buffer mode switched to: {mode.upper()}") + + +# ------------------------------------- +# 3. IO Mode Switching Function (Unchanged logic) +# ------------------------------------- +def set_io_mode(mode): + """Modify the 'localfs.enable.async.io' parameter in pixels-cpp.properties""" + assert mode in ("async", "sync") + + if not os.path.exists(properties_path): + raise FileNotFoundError(f"Configuration file not found: {properties_path}") + + with open(properties_path, "r") as f: + lines = f.readlines() + + new_lines = [] + changed = False + for line in lines: + if line.startswith("localfs.enable.async.io"): + new_value = "true" if mode == "async" else "false" + new_lines.append(f"localfs.enable.async.io={new_value}\n") + changed = True + else: + new_lines.append(line) + + if not changed: + new_value = "true" if mode == "async" else "false" + new_lines.append(f"localfs.enable.async.io={new_value}\n") + + with open(properties_path, "w") as f: + f.writelines(new_lines) + + print(f"🔧 IO mode switched to: {mode.upper()}") + +# ------------------------------------- +# 4. Nested Loop for Test Execution +# ------------------------------------- +for buffer_mode in buffer_modes: + print(f"\n===========================================") + print(f"🚀 Starting Test for Buffer Mode: {buffer_mode.upper()}") + print(f"===========================================") + set_buffer_mode(buffer_mode) # <-- Set the current Buffer Mode + + for io_mode in ["sync", "async"]: + print(f"\n======= Switching to {io_mode.upper()} mode =======") + set_io_mode(io_mode) + + for benchmark in benchmarks: + print(f"\n===== Benchmark: {benchmark} ({buffer_mode}/{io_mode}) =====\n") + + # Create an isolated directory: output_root/benchmark/buffer_mode/io_mode/ + benchmark_dir = os.path.join(output_root, benchmark, buffer_mode, io_mode) + os.makedirs(benchmark_dir, exist_ok=True) + print(f"📁 Directory created: {benchmark_dir}") + + for t in threads_list: + output_csv = os.path.join( + benchmark_dir, + f"duckdb_benchmark_result-{buffer_mode}-{io_mode}-{t}threads.csv" + ) + + cmd = [ + "python", + process_script, + "--benchmark", benchmark, + "--runs", str(runs), + "--output-csv", output_csv, + "--threads", str(t), + ] + + print(f"\n▶ Executing: {benchmark}, Buffer={buffer_mode}, IO={io_mode}, {t} threads") + print("Command:", " ".join(cmd)) + + subprocess.run(cmd, check=True) + + print(f"✔ Completed: {output_csv}\n") + +print("\n🎉 All tasks (doublebuffer/singlebuffer, sync/async) completed successfully!")