diff --git a/.github/workflows/integration_tests.yaml b/.github/workflows/integration_tests.yaml new file mode 100644 index 0000000..efa933c --- /dev/null +++ b/.github/workflows/integration_tests.yaml @@ -0,0 +1,51 @@ +name: Integration tests + +on: + workflow_dispatch: + pull_request: + push: + branches: [main] + +jobs: + build_integration_container_and_run_tests: + runs-on: ubuntu-24.04 + steps: + - name: Checkout code + uses: actions/checkout@v5 + + - name: Create build environment + uses: mamba-org/setup-micromamba@v2 + with: + environment-file: ./environment-dev.yml + environment-name: build_env + cache-environment: true + + - name: Configure using CMake + run: | + cmake -G Ninja \ + -Bbuild \ + -DCMAKE_BUILD_TYPE:STRING=RELEASE \ + -DSPARROW_IPC_BUILD_INTEGRATION_TESTS=ON \ + -DFETCH_DEPENDENCIES_WITH_CMAKE=MISSING \ + -DSPARROW_IPC_BUILD_SHARED=ON + + - name: Build file_to_stream target + working-directory: build + run: cmake --build . --config Release --target file_to_stream + + - name: Build stream_to_file target + working-directory: build + run: cmake --build . --config Release --target stream_to_file + + - name: Build Docker image + run: docker build -t sparrow/integration-tests -f ci/docker/integration.dockerfile . + + - name: Run Integration tests + run: | + docker run --rm \ + -e ARCHERY_INTEGRATION_WITH_EXTERNAL_LIBRARY=/workspace/build/bin/RELEASE/ \ + -e ARCHERY_INTEGRATION_EXTERNAL_LIBRARY_IPC_PRODUCER=true \ + -e ARCHERY_INTEGRATION_EXTERNAL_LIBRARY_IPC_CONSUMER=true \ + -v ${{ github.workspace }}:/workspace \ + -w /arrow-integration sparrow/integration-tests \ + "/arrow-integration/ci/scripts/integration_arrow.sh /arrow-integration /build" diff --git a/CMakeLists.txt b/CMakeLists.txt index 47fab25..c4ddbd2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,8 +11,6 @@ include(CMakeDependentOption) list(PREPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake") message(DEBUG "CMake module path: ${CMAKE_MODULE_PATH}") -include(external_dependencies) - set(SPARROW_IPC_COMPILE_DEFINITIONS "" CACHE STRING "List of public compile definitions of the sparrow-ipc target") set(SPARROW_IPC_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/include) @@ -85,11 +83,16 @@ MESSAGE(STATUS "🔧 Build docs: ${SPARROW_IPC_BUILD_DOCS}") OPTION(SPARROW_IPC_BUILD_EXAMPLES "Build sparrow-ipc examples" OFF) MESSAGE(STATUS "🔧 Build examples: ${SPARROW_IPC_BUILD_EXAMPLES}") +OPTION(SPARROW_IPC_BUILD_INTEGRATION_TESTS "Build sparrow-ipc integration tests" OFF) +MESSAGE(STATUS "🔧 Build integration tests: ${SPARROW_IPC_BUILD_INTEGRATION_TESTS}") + # Code coverage # ============= OPTION(SPARROW_IPC_ENABLE_COVERAGE "Enable sparrow-ipc test coverage" OFF) MESSAGE(STATUS "🔧 Enable coverage: ${SPARROW_IPC_ENABLE_COVERAGE}") +include(external_dependencies) + if(SPARROW_IPC_ENABLE_COVERAGE) include(code_coverage) endif() @@ -284,6 +287,13 @@ if(SPARROW_IPC_BUILD_EXAMPLES) add_subdirectory(examples) endif() +# Integration tests +# ================= +if(SPARROW_IPC_BUILD_INTEGRATION_TESTS) + message(STATUS "🔨 Create integration tests targets") + add_subdirectory(integration_tests) +endif() + # Installation # ============ include(GNUInstallDirs) diff --git a/ci/docker/integration.dockerfile b/ci/docker/integration.dockerfile new file mode 100644 index 0000000..67e9061 --- /dev/null +++ b/ci/docker/integration.dockerfile @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +FROM apache/arrow-dev:amd64-conda-integration + +ENV ARROW_USE_CCACHE=OFF \ + ARROW_CPP_EXE_PATH=/build/cpp/debug \ + BUILD_DOCS_CPP=OFF \ + ARROW_INTEGRATION_CPP=ON \ + ARROW_INTEGRATION_CSHARP=OFF \ + ARROW_INTEGRATION_GO=OFF \ + ARROW_INTEGRATION_JAVA=OFF \ + ARROW_INTEGRATION_JS=OFF \ + ARCHERY_INTEGRATION_WITH_NANOARROW="0" \ + ARCHERY_INTEGRATION_WITH_RUST="0" + +RUN apt update + +RUN apt install build-essential git -y + +# Clone the arrow monorepo // TODO: change to the official repo +RUN git clone --depth 1 --branch archery_supports_external_libraries https://github.com/Alex-PLACET/arrow.git /arrow-integration --recurse-submodules + +# Build all the integrations +RUN conda run --no-capture-output \ + /arrow-integration/ci/scripts/integration_arrow_build.sh \ + /arrow-integration \ + /build diff --git a/cmake/external_dependencies.cmake b/cmake/external_dependencies.cmake index 897b48f..56a4054 100644 --- a/cmake/external_dependencies.cmake +++ b/cmake/external_dependencies.cmake @@ -68,7 +68,7 @@ function(find_package_or_fetch) endfunction() set(SPARROW_BUILD_SHARED ${SPARROW_IPC_BUILD_SHARED}) -if(${SPARROW_IPC_BUILD_TESTS}) +if(${SPARROW_IPC_BUILD_TESTS} OR ${SPARROW_IPC_BUILD_INTEGRATION_TESTS}) set(CREATE_JSON_READER_TARGET ON) endif() find_package_or_fetch( @@ -81,7 +81,7 @@ unset(CREATE_JSON_READER_TARGET) if(NOT TARGET sparrow::sparrow) add_library(sparrow::sparrow ALIAS sparrow) endif() -if(${SPARROW_IPC_BUILD_TESTS}) +if(${SPARROW_IPC_BUILD_TESTS} OR ${SPARROW_IPC_BUILD_INTEGRATION_TESTS}) find_package_or_fetch( PACKAGE_NAME sparrow-json-reader ) @@ -123,7 +123,7 @@ if(NOT TARGET lz4::lz4) add_library(lz4::lz4 ALIAS lz4) endif() -if(SPARROW_IPC_BUILD_TESTS) +if(${SPARROW_IPC_BUILD_TESTS} OR ${SPARROW_IPC_BUILD_INTEGRATION_TESTS}) find_package_or_fetch( PACKAGE_NAME doctest GIT_REPOSITORY https://github.com/doctest/doctest.git diff --git a/integration_tests/CMakeLists.txt b/integration_tests/CMakeLists.txt new file mode 100644 index 0000000..fb08d24 --- /dev/null +++ b/integration_tests/CMakeLists.txt @@ -0,0 +1,130 @@ +cmake_minimum_required(VERSION 3.28) + +# Create executable for file_to_stream integration test +add_executable(file_to_stream file_to_stream.cpp) + +target_link_libraries(file_to_stream + PRIVATE + sparrow-ipc + sparrow::sparrow + sparrow::json_reader +) + +set_target_properties(file_to_stream + PROPERTIES + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CXX_EXTENSIONS OFF +) + +target_include_directories(file_to_stream + PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_BINARY_DIR}/generated +) + +add_dependencies(file_to_stream generate_flatbuffers_headers) + +# Create executable for stream_to_file integration test +add_executable(stream_to_file stream_to_file.cpp) + +target_link_libraries(stream_to_file + PRIVATE + sparrow-ipc + sparrow::sparrow +) + +set_target_properties(stream_to_file + PROPERTIES + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CXX_EXTENSIONS OFF +) + +target_include_directories(stream_to_file + PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_BINARY_DIR}/generated +) + +add_dependencies(stream_to_file generate_flatbuffers_headers) + +# Create test executable for integration tools +add_executable(test_integration_tools main.cpp test_integration_tools.cpp) + +target_link_libraries(test_integration_tools + PRIVATE + sparrow-ipc + sparrow::sparrow + sparrow::json_reader + doctest::doctest + arrow-testing-data +) + +target_compile_definitions(test_integration_tools + PRIVATE + INTEGRATION_TOOLS_DIR="${CMAKE_CURRENT_BINARY_DIR}" +) + +set_target_properties(test_integration_tools + PROPERTIES + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CXX_EXTENSIONS OFF +) + +target_include_directories(test_integration_tools + PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_BINARY_DIR}/generated +) + +add_dependencies(test_integration_tools generate_flatbuffers_headers file_to_stream stream_to_file) + +# Register with CTest +enable_testing() +add_test(NAME integration_tools_test COMMAND test_integration_tools) + +# On Windows, copy required DLLs +if(WIN32) + add_custom_command( + TARGET file_to_stream POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMENT "Copying DLLs to file_to_stream executable directory" + ) + + add_custom_command( + TARGET stream_to_file POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMENT "Copying DLLs to stream_to_file executable directory" + ) + + add_custom_command( + TARGET test_integration_tools POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMENT "Copying DLLs to test_integration_tools executable directory" + ) +endif() + +set_target_properties(file_to_stream stream_to_file test_integration_tools PROPERTIES FOLDER "Integration Tests") diff --git a/integration_tests/file_to_stream.cpp b/integration_tests/file_to_stream.cpp new file mode 100644 index 0000000..5a4579a --- /dev/null +++ b/integration_tests/file_to_stream.cpp @@ -0,0 +1,116 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include "sparrow/json_reader/json_parser.hpp" + +#include +#include + +/** + * @brief Reads a JSON file containing record batches and outputs the serialized Arrow IPC stream to stdout. + * + * This program takes a JSON file path as a command-line argument, parses the record batches + * from the JSON data, serializes them into Arrow IPC stream format, and writes the binary + * stream to stdout. The output can be redirected to a file or piped to another program. + * + * Usage: file_to_stream + * + * @param argc Number of command-line arguments + * @param argv Array of command-line arguments + * @return EXIT_SUCCESS on success, EXIT_FAILURE on error + */ +int main(int argc, char* argv[]) +{ + // Check command-line arguments + if (argc != 2) + { + std::cerr << "Usage: " << argv[0] << " \n"; + std::cerr << "Reads a JSON file and outputs the serialized Arrow IPC stream to stdout.\n"; + return EXIT_FAILURE; + } + + const std::filesystem::path json_path(argv[1]); + + try + { + // Check if the JSON file exists + if (!std::filesystem::exists(json_path)) + { + std::cerr << "Error: File not found: " << json_path << "\n"; + return EXIT_FAILURE; + } + + // Open and parse the JSON file + std::ifstream json_file(json_path); + if (!json_file.is_open()) + { + std::cerr << "Error: Could not open file: " << json_path << "\n"; + return EXIT_FAILURE; + } + + nlohmann::json json_data; + try + { + json_data = nlohmann::json::parse(json_file); + } + catch (const nlohmann::json::parse_error& e) + { + std::cerr << "Error: Failed to parse JSON file: " << e.what() << "\n"; + return EXIT_FAILURE; + } + json_file.close(); + + // Get the number of batches + if (!json_data.contains("batches") || !json_data["batches"].is_array()) + { + std::cerr << "Error: JSON file does not contain a 'batches' array.\n"; + return EXIT_FAILURE; + } + + const size_t num_batches = json_data["batches"].size(); + + // Parse all record batches from JSON + std::vector record_batches; + record_batches.reserve(num_batches); + + for (size_t batch_idx = 0; batch_idx < num_batches; ++batch_idx) + { + try + { + record_batches.emplace_back( + sparrow::json_reader::build_record_batch_from_json(json_data, batch_idx) + ); + } + catch (const std::exception& e) + { + std::cerr << "Error: Failed to build record batch " << batch_idx << ": " << e.what() + << "\n"; + return EXIT_FAILURE; + } + } + + // Serialize record batches to Arrow IPC stream format + std::vector stream_data; + sparrow_ipc::memory_output_stream stream(stream_data); + sparrow_ipc::serializer serializer(stream); + + serializer << record_batches << sparrow_ipc::end_stream; + + // Write the binary stream to stdout + std::cout.write(reinterpret_cast(stream_data.data()), stream_data.size()); + std::cout.flush(); + + return EXIT_SUCCESS; + } + catch (const std::exception& e) + { + std::cerr << "Error: " << e.what() << "\n"; + return EXIT_FAILURE; + } +} diff --git a/integration_tests/main.cpp b/integration_tests/main.cpp new file mode 100644 index 0000000..9522fa7 --- /dev/null +++ b/integration_tests/main.cpp @@ -0,0 +1,2 @@ +#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN +#include "doctest/doctest.h" diff --git a/integration_tests/stream_to_file.cpp b/integration_tests/stream_to_file.cpp new file mode 100644 index 0000000..fd84e56 --- /dev/null +++ b/integration_tests/stream_to_file.cpp @@ -0,0 +1,110 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +/** + * @brief Reads an Arrow IPC stream from a file and writes it to another file. + * + * This program reads a binary Arrow IPC stream from an input file, deserializes it + * to verify its validity, then re-serializes it and writes the result to the specified + * output file. This ensures the output file contains a valid Arrow IPC stream. + * + * Usage: stream_to_file + * + * @param argc Number of command-line arguments + * @param argv Array of command-line arguments + * @return EXIT_SUCCESS on success, EXIT_FAILURE on error + */ +int main(int argc, char* argv[]) +{ + // Check command-line arguments + if (argc != 3) + { + std::cerr << "Usage: " << argv[0] << " \n"; + std::cerr << "Reads an Arrow IPC stream from a file and writes it to another file.\n"; + return EXIT_FAILURE; + } + + const std::filesystem::path input_path(argv[1]); + const std::filesystem::path output_path(argv[2]); + + try + { + // Check if the input file exists + if (!std::filesystem::exists(input_path)) + { + std::cerr << "Error: Input file not found: " << input_path << "\n"; + return EXIT_FAILURE; + } + + // Read the entire stream from the input file + std::ifstream input_file(input_path, std::ios::in | std::ios::binary); + if (!input_file.is_open()) + { + std::cerr << "Error: Could not open input file: " << input_path << "\n"; + return EXIT_FAILURE; + } + + std::vector input_stream_data( + (std::istreambuf_iterator(input_file)), + std::istreambuf_iterator() + ); + input_file.close(); + + if (input_stream_data.empty()) + { + std::cerr << "Error: No data received from stdin.\n"; + return EXIT_FAILURE; + } + + // Deserialize the stream to validate it and extract record batches + std::vector record_batches; + try + { + record_batches = sparrow_ipc::deserialize_stream(std::span(input_stream_data)); + } + catch (const std::exception& e) + { + std::cerr << "Error: Failed to deserialize stream: " << e.what() << "\n"; + return EXIT_FAILURE; + } + + // Re-serialize the record batches to ensure a valid output stream + std::vector output_stream_data; + sparrow_ipc::memory_output_stream stream(output_stream_data); + sparrow_ipc::serializer serializer(stream); + + serializer << record_batches << sparrow_ipc::end_stream; + + // Write the stream to the output file + std::ofstream output_file(output_path, std::ios::out | std::ios::binary); + if (!output_file.is_open()) + { + std::cerr << "Error: Could not open output file: " << output_path << "\n"; + return EXIT_FAILURE; + } + + output_file.write(reinterpret_cast(output_stream_data.data()), output_stream_data.size()); + output_file.close(); + + if (!output_file.good()) + { + std::cerr << "Error: Failed to write to output file: " << output_path << "\n"; + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; + } + catch (const std::exception& e) + { + std::cerr << "Error: " << e.what() << "\n"; + return EXIT_FAILURE; + } +} diff --git a/integration_tests/test_integration_tools.cpp b/integration_tests/test_integration_tools.cpp new file mode 100644 index 0000000..4056d95 --- /dev/null +++ b/integration_tests/test_integration_tools.cpp @@ -0,0 +1,455 @@ +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include "sparrow/json_reader/json_parser.hpp" + +#include "doctest/doctest.h" +#include "sparrow_ipc/deserialize.hpp" + +// Helper function to execute a command and capture output +struct CommandResult +{ + int exit_code; + std::string stdout_data; + std::string stderr_data; +}; + +#ifdef _WIN32 +#include + +CommandResult execute_command(const std::string& command) +{ + CommandResult result; + + // Create temporary files for stdout and stderr + const std::string stdout_file = std::tmpnam(nullptr); + const std::string stderr_file = std::tmpnam(nullptr); + + const std::string full_command = command + " > " + stdout_file + " 2> " + stderr_file; + + result.exit_code = std::system(full_command.c_str()); + + // Read stdout + std::ifstream stdout_stream(stdout_file, std::ios::binary); + if (stdout_stream) + { + std::ostringstream ss; + ss << stdout_stream.rdbuf(); + result.stdout_data = ss.str(); + } + + // Read stderr + std::ifstream stderr_stream(stderr_file, std::ios::binary); + if (stderr_stream) + { + std::ostringstream ss; + ss << stderr_stream.rdbuf(); + result.stderr_data = ss.str(); + } + + // Clean up + std::filesystem::remove(stdout_file); + std::filesystem::remove(stderr_file); + + return result; +} + +#else +#include +#include + +CommandResult execute_command(const std::string& command) +{ + CommandResult result; + + // Check if command already contains output redirection + const bool has_redirection = (command.find('>') != std::string::npos); + + if (has_redirection) + { + // Command already has redirection, execute as-is + // But we still want to capture stderr for error checking + const std::filesystem::path stderr_file = std::filesystem::temp_directory_path() / ("stderr_" + std::to_string(std::time(nullptr))); + const std::string full_command = command + " 2> " + stderr_file.string(); + result.exit_code = std::system(full_command.c_str()); + + // Read stderr + std::ifstream stderr_stream(stderr_file, std::ios::binary); + if (stderr_stream) + { + std::ostringstream ss; + ss << stderr_stream.rdbuf(); + result.stderr_data = ss.str(); + } + + // Clean up + std::filesystem::remove(stderr_file); + } + else + { + // Create temporary files for stdout and stderr + const std::filesystem::path stdout_file = std::filesystem::temp_directory_path() / ("stdout_" + std::to_string(std::time(nullptr))); + const std::filesystem::path stderr_file = std::filesystem::temp_directory_path() / ("stderr_" + std::to_string(std::time(nullptr))); + + // The command string is already properly formed (executable path + args) + // We need to redirect stdout and stderr to files + const std::string full_command = command + " > " + stdout_file.string() + " 2> " + stderr_file.string(); + + result.exit_code = std::system(full_command.c_str()); + + // Read stdout + std::ifstream stdout_stream(stdout_file, std::ios::binary); + if (stdout_stream) + { + std::ostringstream ss; + ss << stdout_stream.rdbuf(); + result.stdout_data = ss.str(); + } + + // Read stderr + std::ifstream stderr_stream(stderr_file, std::ios::binary); + if (stderr_stream) + { + std::ostringstream ss; + ss << stderr_stream.rdbuf(); + result.stderr_data = ss.str(); + } + + // Clean up + std::filesystem::remove(stdout_file); + std::filesystem::remove(stderr_file); + } + + return result; +} +#endif + +// Helper to compare record batches +void compare_record_batches( + const std::vector& record_batches_1, + const std::vector& record_batches_2 +) +{ + REQUIRE_EQ(record_batches_1.size(), record_batches_2.size()); + for (size_t i = 0; i < record_batches_1.size(); ++i) + { + REQUIRE_EQ(record_batches_1[i].nb_columns(), record_batches_2[i].nb_columns()); + for (size_t y = 0; y < record_batches_1[i].nb_columns(); y++) + { + const auto& column_1 = record_batches_1[i].get_column(y); + const auto& column_2 = record_batches_2[i].get_column(y); + REQUIRE_EQ(column_1.size(), column_2.size()); + CHECK_EQ(record_batches_1[i].names()[y], record_batches_2[i].names()[y]); + for (size_t z = 0; z < column_1.size(); z++) + { + const auto col_name = column_1.name().value_or("NA"); + INFO("Comparing batch " << i << ", column " << y << " named: " << col_name << ", row " << z); + REQUIRE_EQ(column_1.data_type(), column_2.data_type()); + CHECK_EQ(column_1[z], column_2[z]); + } + } + } +} + +TEST_SUITE("Integration Tools Tests") +{ + // Get paths to test data + const std::filesystem::path arrow_testing_data_dir = ARROW_TESTING_DATA_DIR; + const std::filesystem::path tests_resources_files_path = + arrow_testing_data_dir / "data" / "arrow-ipc-stream" / "integration" / "cpp-21.0.0"; + + // Paths to the executables - defined at compile time + const std::filesystem::path exe_dir = INTEGRATION_TOOLS_DIR; + const std::filesystem::path file_to_stream_exe = exe_dir / "file_to_stream"; + const std::filesystem::path stream_to_file_exe = exe_dir / "stream_to_file"; + + // Helper to build command with properly quoted executable + auto make_command = [](const std::filesystem::path& exe, const std::string& args = "") { + std::string cmd = "\"" + exe.string() + "\""; + if (!args.empty()) { + cmd += " " + args; + } + return cmd; + }; + + TEST_CASE("file_to_stream - No arguments") + { + auto result = execute_command(make_command(file_to_stream_exe)); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("Usage:") != std::string::npos); + } + + TEST_CASE("file_to_stream - Non-existent file") + { + const std::string non_existent = "non_existent_file_12345.json"; + auto result = execute_command(make_command(file_to_stream_exe, non_existent)); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("not found") != std::string::npos); + } + + TEST_CASE("stream_to_file - No arguments") + { + auto result = execute_command(make_command(stream_to_file_exe)); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("Usage:") != std::string::npos); + } + + TEST_CASE("stream_to_file - Only one argument") + { + auto result = execute_command(make_command(stream_to_file_exe, "output.stream")); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("Usage:") != std::string::npos); + } + + TEST_CASE("stream_to_file - Non-existent input file") + { + const std::string non_existent = "non_existent_file_12345.stream"; + const std::string output_file = "output.stream"; + auto result = execute_command(make_command(stream_to_file_exe, non_existent + " " + output_file)); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("not found") != std::string::npos); + } + + TEST_CASE("file_to_stream - Convert JSON to stream") + { + // Test with a known good JSON file + const std::filesystem::path json_file = tests_resources_files_path / "generated_primitive.json"; + + if (!std::filesystem::exists(json_file)) + { + MESSAGE("Skipping test: test file not found at " << json_file); + return; + } + + const std::filesystem::path output_stream = std::filesystem::temp_directory_path() / "test_output.stream"; + + // Execute file_to_stream + const std::string command = "\"" + file_to_stream_exe.string() + "\" \"" + json_file.string() + "\" > \"" + output_stream.string() + "\""; + auto result = execute_command(command); + + CHECK_EQ(result.exit_code, 0); + CHECK(std::filesystem::exists(output_stream)); + CHECK_GT(std::filesystem::file_size(output_stream), 0); + + // Verify the output is a valid stream by deserializing it + std::ifstream stream_file(output_stream, std::ios::binary); + REQUIRE(stream_file.is_open()); + + std::vector stream_data( + (std::istreambuf_iterator(stream_file)), + std::istreambuf_iterator() + ); + stream_file.close(); + + // Should be able to deserialize without errors + CHECK_NOTHROW(sparrow_ipc::deserialize_stream(std::span(stream_data))); + + // Clean up + std::filesystem::remove(output_stream); + } + + TEST_CASE("stream_to_file - Process stream file") + { + const std::filesystem::path input_stream = tests_resources_files_path / "generated_primitive.stream"; + + if (!std::filesystem::exists(input_stream)) + { + MESSAGE("Skipping test: test file not found at " << input_stream); + return; + } + + const std::filesystem::path output_stream = std::filesystem::temp_directory_path() / "test_stream_output.stream"; + + // Execute stream_to_file + const std::string command = "\"" + stream_to_file_exe.string() + "\" \"" + input_stream.string() + "\" \"" + output_stream.string() + "\""; + auto result = execute_command(command); + + CHECK_EQ(result.exit_code, 0); + CHECK(std::filesystem::exists(output_stream)); + CHECK_GT(std::filesystem::file_size(output_stream), 0); + + // Verify the output is a valid stream + std::ifstream output_file(output_stream, std::ios::binary); + REQUIRE(output_file.is_open()); + + std::vector output_data( + (std::istreambuf_iterator(output_file)), + std::istreambuf_iterator() + ); + output_file.close(); + + CHECK_NOTHROW(sparrow_ipc::deserialize_stream(std::span(output_data))); + + // Clean up + std::filesystem::remove(output_stream); + } + + TEST_CASE("Round-trip: JSON -> stream -> file -> deserialize") + { + const std::filesystem::path json_file = tests_resources_files_path / "generated_primitive.json"; + + if (!std::filesystem::exists(json_file)) + { + MESSAGE("Skipping test: test file not found at " << json_file); + return; + } + + const std::filesystem::path intermediate_stream = std::filesystem::temp_directory_path() / "intermediate.stream"; + const std::filesystem::path final_stream = std::filesystem::temp_directory_path() / "final.stream"; + + // Step 1: JSON -> stream + { + const std::string command = "\"" + file_to_stream_exe.string() + "\" \"" + json_file.string() + "\" > \"" + intermediate_stream.string() + "\""; + auto result = execute_command(command); + REQUIRE_EQ(result.exit_code, 0); + REQUIRE(std::filesystem::exists(intermediate_stream)); + } + + // Step 2: stream -> file + { + const std::string command = "\"" + stream_to_file_exe.string() + "\" \"" + intermediate_stream.string() + "\" \"" + final_stream.string() + "\""; + auto result = execute_command(command); + REQUIRE_EQ(result.exit_code, 0); + REQUIRE(std::filesystem::exists(final_stream)); + } + + // Step 3: Compare the results + // Load original JSON data + std::ifstream json_input(json_file); + REQUIRE(json_input.is_open()); + nlohmann::json json_data = nlohmann::json::parse(json_input); + json_input.close(); + + const size_t num_batches = json_data["batches"].size(); + std::vector original_batches; + for (size_t i = 0; i < num_batches; ++i) + { + original_batches.emplace_back( + sparrow::json_reader::build_record_batch_from_json(json_data, i) + ); + } + + // Load final stream + std::ifstream final_file(final_stream, std::ios::binary); + REQUIRE(final_file.is_open()); + std::vector final_data( + (std::istreambuf_iterator(final_file)), + std::istreambuf_iterator() + ); + final_file.close(); + + auto final_batches = sparrow_ipc::deserialize_stream(std::span(final_data)); + + // Compare + compare_record_batches(original_batches, final_batches); + + // Clean up + std::filesystem::remove(intermediate_stream); + std::filesystem::remove(final_stream); + } + + TEST_CASE("Paths with spaces") + { + const std::filesystem::path json_file = tests_resources_files_path / "generated_primitive.json"; + + if (!std::filesystem::exists(json_file)) + { + MESSAGE("Skipping test: test file not found at " << json_file); + return; + } + + // Create temporary directory with spaces in the name + const std::filesystem::path temp_dir = std::filesystem::temp_directory_path() / "test dir with spaces"; + std::filesystem::create_directories(temp_dir); + + const std::filesystem::path output_stream = temp_dir / "output file.stream"; + const std::filesystem::path final_stream = temp_dir / "final output.stream"; + + // Step 1: JSON -> stream with spaces in output path + { + const std::string command = "\"" + file_to_stream_exe.string() + "\" \"" + json_file.string() + "\" > \"" + output_stream.string() + "\""; + auto result = execute_command(command); + CHECK_EQ(result.exit_code, 0); + CHECK(std::filesystem::exists(output_stream)); + } + + // Step 2: stream -> file with spaces in both paths + { + const std::string command = "\"" + stream_to_file_exe.string() + "\" \"" + output_stream.string() + "\" \"" + final_stream.string() + "\""; + auto result = execute_command(command); + CHECK_EQ(result.exit_code, 0); + CHECK(std::filesystem::exists(final_stream)); + } + + // Verify the final output is valid + std::ifstream final_file(final_stream, std::ios::binary); + REQUIRE(final_file.is_open()); + std::vector final_data( + (std::istreambuf_iterator(final_file)), + std::istreambuf_iterator() + ); + final_file.close(); + + CHECK_NOTHROW(sparrow_ipc::deserialize_stream(std::span(final_data))); + + // Clean up + std::filesystem::remove_all(temp_dir); + } + + TEST_CASE("Multiple test files") + { + const std::vector test_files = { + "generated_primitive", + "generated_binary", + "generated_primitive_zerolength", + "generated_binary_zerolength" + }; + + for (const auto& test_file : test_files) + { + const std::filesystem::path json_file = tests_resources_files_path / (test_file + ".json"); + + if (!std::filesystem::exists(json_file)) + { + MESSAGE("Skipping test file: " << json_file); + continue; + } + + SUBCASE(test_file.c_str()) + { + const std::filesystem::path output_stream = std::filesystem::temp_directory_path() / (test_file + "_output.stream"); + + // Convert JSON to stream + const std::string command = "\"" + file_to_stream_exe.string() + "\" \"" + json_file.string() + "\" > \"" + output_stream.string() + "\""; + auto result = execute_command(command); + + CHECK_EQ(result.exit_code, 0); + CHECK(std::filesystem::exists(output_stream)); + + // Deserialize and verify + std::ifstream stream_file(output_stream, std::ios::binary); + if (stream_file.is_open()) + { + std::vector stream_data( + (std::istreambuf_iterator(stream_file)), + std::istreambuf_iterator() + ); + stream_file.close(); + + CHECK_NOTHROW(sparrow_ipc::deserialize_stream(std::span(stream_data))); + } + + // Clean up + std::filesystem::remove(output_stream); + } + } + } +}