From 20fb22f24b75526de44e6c2f54c4cce9b006f450 Mon Sep 17 00:00:00 2001 From: Ben Arthur Date: Thu, 28 Aug 2025 08:58:50 -0400 Subject: [PATCH 1/7] claude's response when asked to generate a PR for #273 --- docs/src/threading.md | 79 ++++++++++++++++++++++++++++++++++++ src/DiskArrays.jl | 8 ++++ src/threaded_algorithms.jl | 50 +++++++++++++++++++++++ src/threading.jl | 71 +++++++++++++++++++++++++++++++++ test/runtests.jl | 2 + test/test_threading.jl | 82 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 292 insertions(+) create mode 100644 docs/src/threading.md create mode 100644 src/threaded_algorithms.jl create mode 100644 src/threading.jl create mode 100644 test/test_threading.jl diff --git a/docs/src/threading.md b/docs/src/threading.md new file mode 100644 index 0000000..ef78edb --- /dev/null +++ b/docs/src/threading.md @@ -0,0 +1,79 @@ +# Threading Support + +DiskArrays.jl provides support for threaded algorithms when the underlying +storage backend supports thread-safe read operations. + +## Threading Trait System + +The threading support is based on a trait system that allows backends to +declare whether they support thread-safe operations: + +```julia +using DiskArrays + +# Check if an array supports threading +is_thread_safe(my_array) + +# Get the threading trait +threading_trait(my_array) # Returns ThreadSafe() or NotThreadSafe() +``` + +## Global Threading Control + +You can globally enable or disable threading for all DiskArray operations: + +```julia +# Disable threading globally +disable_threading() + +# Enable threading globally (default) +enable_threading() + +# Check current status +threading_enabled() +``` + +## Implementing Threading Support in Backends + +Backend developers can opt into threading support by overriding the threading_trait method: + +```julia +# For a hypothetical ThreadSafeArray type +DiskArrays.threading_trait(::Type{ThreadSafeArray}) = DiskArrays.ThreadSafe() +``` + +Important: Only declare your backend as thread-safe if: + +* Multiple threads can safely read from the storage simultaneously +* The underlying storage system (files, network, etc.) supports concurrent access +* No global state is modified during read operations + +## Threaded Algorithms + +Currently supported threaded algorithms: + +### unique + +```julia +# Will automatically use threading if backend supports it +result = unique(my_disk_array) + +# With a function +result = unique(x -> x % 10, my_disk_array) + +# Explicitly use threaded version +result = unique_threaded(my_disk_array) +``` + +The threaded unique algorithm: + +* Processes each chunk in parallel using `Threads.@threads :greedy` +* Combines results using a reduction operation +* Falls back to single-threaded implementation for non-thread-safe backends + +## Performance Considerations + +* Threading is most beneficial for arrays with many chunks +* I/O bound operations may see limited speedup due to storage bottlenecks +* Consider the overhead of thread coordination for small arrays +* Test with your specific storage backend and access patterns diff --git a/src/DiskArrays.jl b/src/DiskArrays.jl index 9f0bba6..0e99e35 100644 --- a/src/DiskArrays.jl +++ b/src/DiskArrays.jl @@ -37,6 +37,14 @@ include("show.jl") include("cached.jl") include("pad.jl") +include("threading.jl") +include("threaded_algorithms.jl") + +export ThreadingTrait, ThreadSafe, NotThreadSafe, + threading_trait, is_thread_safe, + enable_threading, disable_threading, threading_enabled, + unique_threaded + # The all-in-one macro macro implement_diskarray(t) diff --git a/src/threaded_algorithms.jl b/src/threaded_algorithms.jl new file mode 100644 index 0000000..904e96a --- /dev/null +++ b/src/threaded_algorithms.jl @@ -0,0 +1,50 @@ +""" + unique_threaded(v::AbstractDiskArray) + unique_threaded(f, v::AbstractDiskArray) + +Threaded version of `unique` for DiskArrays. +Only uses threading if the backend is thread-safe and threading is globally enabled. +Falls back to single-threaded implementation otherwise. +""" +function unique_threaded(v::AbstractDiskArray) + return unique_threaded(identity, v) +end + +function unique_threaded(f, v::AbstractDiskArray) + if !should_use_threading(v) + # Fall back to single-threaded implementation + return _unique_single_threaded(f, v) + end + + chunks = collect(eachchunk(v)) + u = Vector{Vector{eltype(v)}}(undef, length(chunks)) + + Threads.@threads :greedy for i in eachindex(chunks) + chunk = chunks[i] + u[i] = unique(f, v[chunk...]) + end + + # Reduce results + return reduce(u; init=eltype(v)[]) do acc, chunk_result + unique!(f, append!(acc, chunk_result)) + end +end + +function _unique_single_threaded(f, v::AbstractDiskArray) + result = eltype(v)[] + for chunk in eachchunk(v) + chunk_unique = unique(f, v[chunk...]) + append!(result, chunk_unique) + unique!(f, result) + end + return result +end + +# Extend Base.unique to use threaded version when appropriate +function Base.unique(v::AbstractDiskArray) + return unique_threaded(v) +end + +function Base.unique(f, v::AbstractDiskArray) + return unique_threaded(f, v) +end diff --git a/src/threading.jl b/src/threading.jl new file mode 100644 index 0000000..f359ee3 --- /dev/null +++ b/src/threading.jl @@ -0,0 +1,71 @@ +""" + ThreadingTrait + +Trait to indicate whether a DiskArray backend supports thread-safe operations. +""" +abstract type ThreadingTrait end + +""" + ThreadSafe() + +Indicates that the DiskArray backend supports thread-safe read operations. +""" +struct ThreadSafe <: ThreadingTrait end + +""" + NotThreadSafe() + +Indicates that the DiskArray backend does not support thread-safe operations. +Default for all backends unless explicitly overridden. +""" +struct NotThreadSafe <: ThreadingTrait end + +""" + threading_trait(::Type{T}) -> ThreadingTrait + threading_trait(x) -> ThreadingTrait + +Return the threading trait for a DiskArray type or instance. +Defaults to `NotThreadSafe()` for safety. +""" +threading_trait(::Type{<:AbstractDiskArray}) = NotThreadSafe() +threading_trait(x::AbstractDiskArray) = threading_trait(typeof(x)) + +""" + is_thread_safe(x) -> Bool + +Check if a DiskArray supports thread-safe operations. +""" +is_thread_safe(x) = threading_trait(x) isa ThreadSafe + +# Global threading control +const THREADING_ENABLED = Ref(true) + +""" + enable_threading(enable::Bool=true) + +Globally enable or disable threading for DiskArray operations. +When disabled, all algorithms will run single-threaded regardless of backend support. +""" +enable_threading(enable::Bool=true) = (THREADING_ENABLED[] = enable) + +""" + disable_threading() + +Globally disable threading for DiskArray operations. +""" +disable_threading() = enable_threading(false) + +""" + threading_enabled() -> Bool + +Check if threading is globally enabled. +""" +threading_enabled() = THREADING_ENABLED[] + +""" + should_use_threading(x) -> Bool + +Determine if threading should be used for a given DiskArray. +Returns true only if both global threading is enabled AND the backend is thread-safe. +""" +should_use_threading(x) = threading_enabled() && is_thread_safe(x) diff --git a/test/runtests.jl b/test/runtests.jl index eaa3b49..7e76ad9 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -11,6 +11,8 @@ using TraceFuns, Suppressor # using JET # JET.report_package(DiskArrays) +include("test_threading.jl") + @testset "Aqua.jl" begin Aqua.test_ambiguities([DiskArrays, Base, Core]) Aqua.test_unbound_args(DiskArrays) diff --git a/test/test_threading.jl b/test/test_threading.jl new file mode 100644 index 0000000..abe809d --- /dev/null +++ b/test/test_threading.jl @@ -0,0 +1,82 @@ +using DiskArrays +using Test + +# Mock thread-safe DiskArray for testing +struct MockThreadSafeDiskArray{T,N} <: AbstractDiskArray{T,N} + data::Array{T,N} + chunks::NTuple{N,Int} +end + +Base.size(a::MockThreadSafeDiskArray) = size(a.data) +Base.getindex(a::MockThreadSafeDiskArray, i::Int...) = a.data[i...] +DiskArrays.eachchunk(a::MockThreadSafeDiskArray) = DiskArrays.GridChunks(a, a.chunks) +DiskArrays.haschunks(::MockThreadSafeDiskArray) = DiskArrays.Chunked() +DiskArrays.readblock!(a::MockThreadSafeDiskArray, aout, r::AbstractUnitRange...) = (aout .= a.data[r...]) + +# Override threading trait for our mock array +DiskArrays.threading_trait(::Type{<:MockThreadSafeDiskArray}) = DiskArrays.ThreadSafe() + +@testset "Threading Traits" begin + # Test default behavior (not thread safe) + regular_array = DiskArrays.ArrayDiskArray(rand(10, 10), (5, 5)) + @test DiskArrays.threading_trait(regular_array) isa DiskArrays.NotThreadSafe + @test !DiskArrays.is_thread_safe(regular_array) + + # Test thread-safe array + thread_safe_array = MockThreadSafeDiskArray(rand(10, 10), (5, 5)) + @test DiskArrays.threading_trait(thread_safe_array) isa DiskArrays.ThreadSafe + @test DiskArrays.is_thread_safe(thread_safe_array) +end + +@testset "Threading Control" begin + # Test global threading control + @test DiskArrays.threading_enabled() # Should be true by default + + DiskArrays.disable_threading() + @test !DiskArrays.threading_enabled() + + DiskArrays.enable_threading() + @test DiskArrays.threading_enabled() + + # Test should_use_threading logic + thread_safe_array = MockThreadSafeDiskArray(rand(10, 10), (5, 5)) + regular_array = DiskArrays.ArrayDiskArray(rand(10, 10), (5, 5)) + + DiskArrays.enable_threading() + @test DiskArrays.should_use_threading(thread_safe_array) + @test !DiskArrays.should_use_threading(regular_array) + + DiskArrays.disable_threading() + @test !DiskArrays.should_use_threading(thread_safe_array) + @test !DiskArrays.should_use_threading(regular_array) + + # Reset to default + DiskArrays.enable_threading() +end + +@testset "Threaded Unique" begin + # Test with thread-safe array + data = [1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 1, 2, 3, 4, 5, 5, 6, 6, 6, 7] + reshape_data = reshape(data, 4, 5) + thread_safe_array = MockThreadSafeDiskArray(reshape_data, (2, 3)) + + result = unique(thread_safe_array) + expected = unique(data) + @test sort(result) == sort(expected) + + # Test with function + result_with_func = unique(x -> x % 3, thread_safe_array) + expected_with_func = unique(x -> x % 3, data) + @test sort(result_with_func) == sort(expected_with_func) + + # Test fallback for non-thread-safe array + regular_array = DiskArrays.ArrayDiskArray(reshape_data, (2, 3)) + result_fallback = unique(regular_array) + @test sort(result_fallback) == sort(expected) + + # Test with threading disabled + DiskArrays.disable_threading() + result_no_threading = unique(thread_safe_array) + @test sort(result_no_threading) == sort(expected) + DiskArrays.enable_threading() # Reset +end From f151e54f181c37acc13edd238769eeacf789cf19 Mon Sep 17 00:00:00 2001 From: Ben Arthur Date: Thu, 28 Aug 2025 10:29:23 -0400 Subject: [PATCH 2/7] my edit to make claude's code work --- docs/src/threading.md | 2 +- src/DiskArrays.jl | 5 +--- src/mapreduce.jl | 16 ++++++++++-- src/threaded_algorithms.jl | 50 -------------------------------------- src/threading.jl | 4 +-- test/test_threading.jl | 19 ++++++--------- 6 files changed, 26 insertions(+), 70 deletions(-) delete mode 100644 src/threaded_algorithms.jl diff --git a/docs/src/threading.md b/docs/src/threading.md index ef78edb..b15dda4 100644 --- a/docs/src/threading.md +++ b/docs/src/threading.md @@ -62,7 +62,7 @@ result = unique(my_disk_array) result = unique(x -> x % 10, my_disk_array) # Explicitly use threaded version -result = unique_threaded(my_disk_array) +result = unique(Val{true}(), f, my_disk_array) ``` The threaded unique algorithm: diff --git a/src/DiskArrays.jl b/src/DiskArrays.jl index 0e99e35..beffe1d 100644 --- a/src/DiskArrays.jl +++ b/src/DiskArrays.jl @@ -36,14 +36,11 @@ include("zip.jl") include("show.jl") include("cached.jl") include("pad.jl") - include("threading.jl") -include("threaded_algorithms.jl") export ThreadingTrait, ThreadSafe, NotThreadSafe, threading_trait, is_thread_safe, - enable_threading, disable_threading, threading_enabled, - unique_threaded + enable_threading, disable_threading, threading_enabled # The all-in-one macro diff --git a/src/mapreduce.jl b/src/mapreduce.jl index 5df3c83..aaf4939 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -67,9 +67,21 @@ function Base.count(f, v::AbstractDiskArray) end end -Base.unique(v::AbstractDiskArray) = unique(identity, v) -function Base.unique(f, v::AbstractDiskArray) +Base.unique(v::AbstractDiskArray) = unique(should_use_threading(v), identity, v) +Base.unique(f, v::AbstractDiskArray) = unique(should_use_threading(v), f, v) + +function Base.unique(::Val{false}, f, v::AbstractDiskArray) reduce((unique(f, v[c...]) for c in eachchunk(v))) do acc, u unique!(f, append!(acc, u)) end end + +function Base.unique(::Val{true}, f, v::AbstractDiskArray) + u = Vector{Vector{eltype(v)}}(undef, length(eachchunk(v))) + Threads.@threads :greedy for (i,c) in enumerate(eachchunk(v)) + u[i] = unique(f, v[c...]) + end + reduce(u) do acc, t + unique!(f, append!(acc, t)) + end +end diff --git a/src/threaded_algorithms.jl b/src/threaded_algorithms.jl deleted file mode 100644 index 904e96a..0000000 --- a/src/threaded_algorithms.jl +++ /dev/null @@ -1,50 +0,0 @@ -""" - unique_threaded(v::AbstractDiskArray) - unique_threaded(f, v::AbstractDiskArray) - -Threaded version of `unique` for DiskArrays. -Only uses threading if the backend is thread-safe and threading is globally enabled. -Falls back to single-threaded implementation otherwise. -""" -function unique_threaded(v::AbstractDiskArray) - return unique_threaded(identity, v) -end - -function unique_threaded(f, v::AbstractDiskArray) - if !should_use_threading(v) - # Fall back to single-threaded implementation - return _unique_single_threaded(f, v) - end - - chunks = collect(eachchunk(v)) - u = Vector{Vector{eltype(v)}}(undef, length(chunks)) - - Threads.@threads :greedy for i in eachindex(chunks) - chunk = chunks[i] - u[i] = unique(f, v[chunk...]) - end - - # Reduce results - return reduce(u; init=eltype(v)[]) do acc, chunk_result - unique!(f, append!(acc, chunk_result)) - end -end - -function _unique_single_threaded(f, v::AbstractDiskArray) - result = eltype(v)[] - for chunk in eachchunk(v) - chunk_unique = unique(f, v[chunk...]) - append!(result, chunk_unique) - unique!(f, result) - end - return result -end - -# Extend Base.unique to use threaded version when appropriate -function Base.unique(v::AbstractDiskArray) - return unique_threaded(v) -end - -function Base.unique(f, v::AbstractDiskArray) - return unique_threaded(f, v) -end diff --git a/src/threading.jl b/src/threading.jl index f359ee3..92a49f0 100644 --- a/src/threading.jl +++ b/src/threading.jl @@ -63,9 +63,9 @@ Check if threading is globally enabled. threading_enabled() = THREADING_ENABLED[] """ - should_use_threading(x) -> Bool + should_use_threading(x) -> Val(Bool) Determine if threading should be used for a given DiskArray. Returns true only if both global threading is enabled AND the backend is thread-safe. """ -should_use_threading(x) = threading_enabled() && is_thread_safe(x) +should_use_threading(x) = Val(threading_enabled() && is_thread_safe(x)) diff --git a/test/test_threading.jl b/test/test_threading.jl index abe809d..a437e5b 100644 --- a/test/test_threading.jl +++ b/test/test_threading.jl @@ -1,6 +1,3 @@ -using DiskArrays -using Test - # Mock thread-safe DiskArray for testing struct MockThreadSafeDiskArray{T,N} <: AbstractDiskArray{T,N} data::Array{T,N} @@ -18,7 +15,7 @@ DiskArrays.threading_trait(::Type{<:MockThreadSafeDiskArray}) = DiskArrays.Threa @testset "Threading Traits" begin # Test default behavior (not thread safe) - regular_array = DiskArrays.ArrayDiskArray(rand(10, 10), (5, 5)) + regular_array = ChunkedDiskArray(rand(10, 10), (5, 5)) @test DiskArrays.threading_trait(regular_array) isa DiskArrays.NotThreadSafe @test !DiskArrays.is_thread_safe(regular_array) @@ -40,21 +37,21 @@ end # Test should_use_threading logic thread_safe_array = MockThreadSafeDiskArray(rand(10, 10), (5, 5)) - regular_array = DiskArrays.ArrayDiskArray(rand(10, 10), (5, 5)) + regular_array = ChunkedDiskArray(rand(10, 10), (5, 5)) DiskArrays.enable_threading() - @test DiskArrays.should_use_threading(thread_safe_array) - @test !DiskArrays.should_use_threading(regular_array) + @test DiskArrays.should_use_threading(thread_safe_array) == Val{true}() + @test DiskArrays.should_use_threading(regular_array) == Val{false}() DiskArrays.disable_threading() - @test !DiskArrays.should_use_threading(thread_safe_array) - @test !DiskArrays.should_use_threading(regular_array) + @test DiskArrays.should_use_threading(thread_safe_array) == Val{false}() + @test DiskArrays.should_use_threading(regular_array) == Val{false}() # Reset to default DiskArrays.enable_threading() end -@testset "Threaded Unique" begin +@testset "Threaded unique" begin # Test with thread-safe array data = [1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 1, 2, 3, 4, 5, 5, 6, 6, 6, 7] reshape_data = reshape(data, 4, 5) @@ -70,7 +67,7 @@ end @test sort(result_with_func) == sort(expected_with_func) # Test fallback for non-thread-safe array - regular_array = DiskArrays.ArrayDiskArray(reshape_data, (2, 3)) + regular_array = ChunkedDiskArray(reshape_data, (2, 3)) result_fallback = unique(regular_array) @test sort(result_fallback) == sort(expected) From 5098deab4c1a092788e55ae4d76a7a227f46b617 Mon Sep 17 00:00:00 2001 From: Ben Arthur Date: Thu, 28 Aug 2025 10:54:59 -0400 Subject: [PATCH 3/7] rename threading test file --- test/runtests.jl | 2 +- test/{test_threading.jl => threading.jl} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename test/{test_threading.jl => threading.jl} (100%) diff --git a/test/runtests.jl b/test/runtests.jl index 7e76ad9..ae5bf42 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -11,7 +11,7 @@ using TraceFuns, Suppressor # using JET # JET.report_package(DiskArrays) -include("test_threading.jl") +include("threading.jl") @testset "Aqua.jl" begin Aqua.test_ambiguities([DiskArrays, Base, Core]) diff --git a/test/test_threading.jl b/test/threading.jl similarity index 100% rename from test/test_threading.jl rename to test/threading.jl From d436aeec6e968ad5db1beecdf3abb3b951ec60b9 Mon Sep 17 00:00:00 2001 From: Ben Arthur Date: Thu, 28 Aug 2025 10:55:15 -0400 Subject: [PATCH 4/7] fix for 1.10 LTS --- src/mapreduce.jl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/mapreduce.jl b/src/mapreduce.jl index aaf4939..426ae6f 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -77,9 +77,10 @@ function Base.unique(::Val{false}, f, v::AbstractDiskArray) end function Base.unique(::Val{true}, f, v::AbstractDiskArray) - u = Vector{Vector{eltype(v)}}(undef, length(eachchunk(v))) - Threads.@threads :greedy for (i,c) in enumerate(eachchunk(v)) - u[i] = unique(f, v[c...]) + chunks = eachchunk(v) + u = Vector{Vector{eltype(v)}}(undef, length(chunks)) + Threads.@threads for i in 1:length(chunks) + u[i] = unique(f, v[chunks[i]...]) end reduce(u) do acc, t unique!(f, append!(acc, t)) From 4aed72c8db25e3ee563c26002a94bc21579a0089 Mon Sep 17 00:00:00 2001 From: Ben Arthur Date: Fri, 29 Aug 2025 12:39:32 -0400 Subject: [PATCH 5/7] remove disable_threading per rafaqz's review --- src/DiskArrays.jl | 2 +- src/threading.jl | 7 ------- test/threading.jl | 6 +++--- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/src/DiskArrays.jl b/src/DiskArrays.jl index beffe1d..96f1eb2 100644 --- a/src/DiskArrays.jl +++ b/src/DiskArrays.jl @@ -40,7 +40,7 @@ include("threading.jl") export ThreadingTrait, ThreadSafe, NotThreadSafe, threading_trait, is_thread_safe, - enable_threading, disable_threading, threading_enabled + enable_threading, threading_enabled # The all-in-one macro diff --git a/src/threading.jl b/src/threading.jl index 92a49f0..f1e57fe 100644 --- a/src/threading.jl +++ b/src/threading.jl @@ -48,13 +48,6 @@ When disabled, all algorithms will run single-threaded regardless of backend sup """ enable_threading(enable::Bool=true) = (THREADING_ENABLED[] = enable) -""" - disable_threading() - -Globally disable threading for DiskArray operations. -""" -disable_threading() = enable_threading(false) - """ threading_enabled() -> Bool diff --git a/test/threading.jl b/test/threading.jl index a437e5b..62d349f 100644 --- a/test/threading.jl +++ b/test/threading.jl @@ -29,7 +29,7 @@ end # Test global threading control @test DiskArrays.threading_enabled() # Should be true by default - DiskArrays.disable_threading() + DiskArrays.enable_threading(false) @test !DiskArrays.threading_enabled() DiskArrays.enable_threading() @@ -43,7 +43,7 @@ end @test DiskArrays.should_use_threading(thread_safe_array) == Val{true}() @test DiskArrays.should_use_threading(regular_array) == Val{false}() - DiskArrays.disable_threading() + DiskArrays.enable_threading(false) @test DiskArrays.should_use_threading(thread_safe_array) == Val{false}() @test DiskArrays.should_use_threading(regular_array) == Val{false}() @@ -72,7 +72,7 @@ end @test sort(result_fallback) == sort(expected) # Test with threading disabled - DiskArrays.disable_threading() + DiskArrays.enable_threading(false) result_no_threading = unique(thread_safe_array) @test sort(result_no_threading) == sort(expected) DiskArrays.enable_threading() # Reset From 3d09e2bc55f6640243bdfb8b5cdc62f25649de1c Mon Sep 17 00:00:00 2001 From: Ben Arthur Date: Tue, 2 Sep 2025 08:54:58 -0400 Subject: [PATCH 6/7] add a second set of traits to describe whether a method is multithreaded --- docs/src/threading.md | 24 ++++++++++++++++++++++-- src/DiskArrays.jl | 7 +++---- src/mapreduce.jl | 4 ++-- src/threading.jl | 23 ++++++++++++++++++++++- test/threading.jl | 8 ++++---- 5 files changed, 53 insertions(+), 13 deletions(-) diff --git a/docs/src/threading.md b/docs/src/threading.md index b15dda4..ae6f2a8 100644 --- a/docs/src/threading.md +++ b/docs/src/threading.md @@ -48,6 +48,26 @@ Important: Only declare your backend as thread-safe if: * The underlying storage system (files, network, etc.) supports concurrent access * No global state is modified during read operations +## Implementing Threading Support for Disk Array Methods + +Add a (or rename the existing) single-threaded method using this signature: + +``` +function Base.myfun(::Type{SingleThreaded}, ...) +``` + +Write a threaded version using this signature: + +``` +function Base.myfun(::Type{MultiThreaded}, ...) +``` + +Add this additional method to automatically dispatch between the two: + +``` +Base.myfun(v::AbstractDiskArray, ...) = myfun(should_use_threading(v), ...) +``` + ## Threaded Algorithms Currently supported threaded algorithms: @@ -62,12 +82,12 @@ result = unique(my_disk_array) result = unique(x -> x % 10, my_disk_array) # Explicitly use threaded version -result = unique(Val{true}(), f, my_disk_array) +result = unique(MultiThreaded, f, my_disk_array) ``` The threaded unique algorithm: -* Processes each chunk in parallel using `Threads.@threads :greedy` +* Processes each chunk in parallel using `Threads.@threads` * Combines results using a reduction operation * Falls back to single-threaded implementation for non-thread-safe backends diff --git a/src/DiskArrays.jl b/src/DiskArrays.jl index 96f1eb2..6fe3aed 100644 --- a/src/DiskArrays.jl +++ b/src/DiskArrays.jl @@ -19,6 +19,7 @@ export AbstractDiskArray, eachchunk, ChunkIndex, ChunkIndices include("scalar.jl") include("chunks.jl") include("diskarray.jl") +include("threading.jl") include("batchgetindex.jl") include("diskindex.jl") include("indexing.jl") @@ -36,11 +37,9 @@ include("zip.jl") include("show.jl") include("cached.jl") include("pad.jl") -include("threading.jl") -export ThreadingTrait, ThreadSafe, NotThreadSafe, - threading_trait, is_thread_safe, - enable_threading, threading_enabled +export ThreadingTrait, ThreadSafe, NotThreadSafe, threading_trait, is_thread_safe, + AlgorithmTrait, SingleThreaded, MultiThreaded, enable_threading, threading_enabled # The all-in-one macro diff --git a/src/mapreduce.jl b/src/mapreduce.jl index 426ae6f..e484ba6 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -70,13 +70,13 @@ end Base.unique(v::AbstractDiskArray) = unique(should_use_threading(v), identity, v) Base.unique(f, v::AbstractDiskArray) = unique(should_use_threading(v), f, v) -function Base.unique(::Val{false}, f, v::AbstractDiskArray) +function Base.unique(::Type{SingleThreaded}, f, v::AbstractDiskArray) reduce((unique(f, v[c...]) for c in eachchunk(v))) do acc, u unique!(f, append!(acc, u)) end end -function Base.unique(::Val{true}, f, v::AbstractDiskArray) +function Base.unique(::Type{MultiThreaded}, f, v::AbstractDiskArray) chunks = eachchunk(v) u = Vector{Vector{eltype(v)}}(undef, length(chunks)) Threads.@threads for i in 1:length(chunks) diff --git a/src/threading.jl b/src/threading.jl index f1e57fe..b2f0467 100644 --- a/src/threading.jl +++ b/src/threading.jl @@ -37,6 +37,27 @@ Check if a DiskArray supports thread-safe operations. """ is_thread_safe(x) = threading_trait(x) isa ThreadSafe +""" + AlgorithmTrait + +Trait to indicate whether a method is multithreaded or not +""" +abstract type AlgorithmTrait end + +""" + SingleThreaded() + +Indicates that a method uses just one thread +""" +struct SingleThreaded <: AlgorithmTrait end + +""" + MultiThreaded() + +Indicates that a method uses all threads available +""" +struct MultiThreaded <: AlgorithmTrait end + # Global threading control const THREADING_ENABLED = Ref(true) @@ -61,4 +82,4 @@ threading_enabled() = THREADING_ENABLED[] Determine if threading should be used for a given DiskArray. Returns true only if both global threading is enabled AND the backend is thread-safe. """ -should_use_threading(x) = Val(threading_enabled() && is_thread_safe(x)) +should_use_threading(x) = threading_enabled() && is_thread_safe(x) ? MultiThreaded : SingleThreaded diff --git a/test/threading.jl b/test/threading.jl index 62d349f..ef98de6 100644 --- a/test/threading.jl +++ b/test/threading.jl @@ -40,12 +40,12 @@ end regular_array = ChunkedDiskArray(rand(10, 10), (5, 5)) DiskArrays.enable_threading() - @test DiskArrays.should_use_threading(thread_safe_array) == Val{true}() - @test DiskArrays.should_use_threading(regular_array) == Val{false}() + @test DiskArrays.should_use_threading(thread_safe_array) == MultiThreaded + @test DiskArrays.should_use_threading(regular_array) == SingleThreaded DiskArrays.enable_threading(false) - @test DiskArrays.should_use_threading(thread_safe_array) == Val{false}() - @test DiskArrays.should_use_threading(regular_array) == Val{false}() + @test DiskArrays.should_use_threading(thread_safe_array) == SingleThreaded + @test DiskArrays.should_use_threading(regular_array) == SingleThreaded # Reset to default DiskArrays.enable_threading() From ada4b5104f42e9293af0d24845257245060f5c04 Mon Sep 17 00:00:00 2001 From: Ben Arthur Date: Tue, 2 Sep 2025 10:01:47 -0400 Subject: [PATCH 7/7] add threaded count method --- docs/src/threading.md | 10 ++++++++++ src/mapreduce.jl | 15 +++++++++++++-- test/threading.jl | 31 +++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/docs/src/threading.md b/docs/src/threading.md index ae6f2a8..467298b 100644 --- a/docs/src/threading.md +++ b/docs/src/threading.md @@ -91,6 +91,16 @@ The threaded unique algorithm: * Combines results using a reduction operation * Falls back to single-threaded implementation for non-thread-safe backends +### count + +Similarly to `unique`, threads will be automatically used unless disabled, or +can be explicitly used: + +``` +count([f], my_disk_array) +count(MultiThreaded, f, my_disk_array) +``` + ## Performance Considerations * Threading is most beneficial for arrays with many chunks diff --git a/src/mapreduce.jl b/src/mapreduce.jl index e484ba6..53a306b 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -60,13 +60,24 @@ for fname in [:sum, :prod, :all, :any, :minimum, :maximum] end end -Base.count(v::AbstractDiskArray) = count(identity, v::AbstractDiskArray) -function Base.count(f, v::AbstractDiskArray) +Base.count(v::AbstractDiskArray) = count(should_use_threading(v), identity, v::AbstractDiskArray) +Base.count(f, v::AbstractDiskArray) = count(should_use_threading(v), f, v::AbstractDiskArray) + +function Base.count(::Type{SingleThreaded}, f, v::AbstractDiskArray) sum(eachchunk(v)) do chunk count(f, v[chunk...]) end end +function Base.count(::Type{MultiThreaded}, f, v::AbstractDiskArray) + chunks = eachchunk(v) + u = Vector{Int}(undef, length(chunks)) + Threads.@threads for i in 1:length(chunks) + u[i] = count(f, v[chunks[i]...]) + end + sum(u) +end + Base.unique(v::AbstractDiskArray) = unique(should_use_threading(v), identity, v) Base.unique(f, v::AbstractDiskArray) = unique(should_use_threading(v), f, v) diff --git a/test/threading.jl b/test/threading.jl index ef98de6..e09a56e 100644 --- a/test/threading.jl +++ b/test/threading.jl @@ -77,3 +77,34 @@ end @test sort(result_no_threading) == sort(expected) DiskArrays.enable_threading() # Reset end + +@testset "Threaded count" begin + # Test with thread-safe array + data_int = [1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 1, 2, 3, 4, 5, 5, 6, 6, 6, 7] + f(x) = x % 3 == 0 + data = Array(f.(data_int)) # instead of BitMatrix + reshape_data_int = reshape(data_int, 4, 5) + thread_safe_array_int = MockThreadSafeDiskArray(reshape_data_int, (2, 3)) + reshape_data = Array(reshape(data, 4, 5)) + thread_safe_array = MockThreadSafeDiskArray(reshape_data, (2, 3)) + + result = count(thread_safe_array) + expected = count(data) + @test result == expected + + # Test with function + result_with_func = count(f, thread_safe_array_int) + expected_with_func = count(f, data_int) + @test result_with_func == expected_with_func + + # Test fallback for non-thread-safe array + regular_array = ChunkedDiskArray(reshape_data, (2, 3)) + result_fallback = count(regular_array) + @test result_fallback == expected + + # Test with threading disabled + DiskArrays.enable_threading(false) + result_no_threading = count(thread_safe_array) + @test result_no_threading == expected + DiskArrays.enable_threading() # Reset +end