diff --git a/Project.toml b/Project.toml index 826d4bb..a1a1099 100644 --- a/Project.toml +++ b/Project.toml @@ -4,6 +4,7 @@ authors = ["Jacob Quinn "] version = "2.5.0" [deps] +BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" diff --git a/src/ConcurrentUtilities.jl b/src/ConcurrentUtilities.jl index f92505a..b7645b1 100644 --- a/src/ConcurrentUtilities.jl +++ b/src/ConcurrentUtilities.jl @@ -3,7 +3,7 @@ module ConcurrentUtilities import Base: AbstractLock, islocked, trylock, lock, unlock export Lockable, OrderedSynchronizer, reset!, ReadWriteLock, readlock, readunlock, @wkspawn, Workers, remote_eval, remote_fetch, Worker, terminate!, WorkerTerminatedException, - Pool, acquire, release, drain!, try_with_timeout, TimeoutException, FIFOLock + Pool, acquire, release, drain!, try_with_timeout, TimeoutException, FIFOLock, SimpleFIFOLock macro samethreadpool_spawn(expr) if VERSION >= v"1.9.2" @@ -23,6 +23,7 @@ include("rwlock.jl") include("pools.jl") using .Pools include("fifolock.jl") +include("simplefifolock.jl") function clear_current_task() current_task().storage = nothing diff --git a/src/simplefifolock.jl b/src/simplefifolock.jl new file mode 100644 index 0000000..de765a1 --- /dev/null +++ b/src/simplefifolock.jl @@ -0,0 +1,152 @@ +""" + SimpleFIFOLock() + +A reentrant lock similar to Base.ReentrantLock, but with strict FIFO ordering. + +This lock supports "barging", in which a thread can jump to the front of the queue of tasks: +``` +lock(fifolock; first=true) +``` + +Calling `lock` inhibits running on finalizers on that thread. + +Implementation note: The implementation uses a Condition. Conditions provide FIFO +notification order, as well as the ability to jump to the front, which makes the +implementation straightforward. +""" +mutable struct SimpleFIFOLock <: AbstractLock + cond::Threads.Condition + reentrancy_count::UInt # 0 iff the lock is not held + locked_by::Union{Task,Nothing} # nothing iff the lock is not held + SimpleFIFOLock() = new(Threads.Condition(), 0, nothing) +end + +""" + trylock(l::SimpleFIFOLock) + +Try to acquire lock `l`. If successful, return `true`. If the lock is held by another +task, do not wait and return `false`. + +Each successful `trylock` must be matched by an `unlock`. +""" +@inline function Base.trylock(l::SimpleFIFOLock) + GC.disable_finalizers() + ct = current_task() + lock(l.cond) + locked_by = l.locked_by + if locked_by === nothing || locked_by === ct + l.reentrancy_count += 1 + l.locked_by = ct + unlock(l.cond) + return true + end + unlock(l.cond) + GC.enable_finalizers() + return false +end + +""" + lock(l::SimpleFIFOLock; first=false) + +Acquire lock `l`. The lock is reentrant, so if the calling task has already acquired the +lock then return immediately. + +Each `lock` must be matched by an `unlock`. +""" +@inline function Base.lock(l::SimpleFIFOLock; first=false) + GC.disable_finalizers() + ct = current_task() + lock(l.cond) + while true + if l.locked_by === nothing || l.locked_by === ct + l.reentrancy_count += 1 + l.locked_by = ct + unlock(l.cond) + return nothing + end + # Don't pay for the try-catch unless we `wait`. + try + wait(l.cond; first) + catch + unlock(l.cond) + rethrow() + end + end +end + +""" + unlock(lock::SimpleFIFOLock) + +Releases ownerhsip of `lock`. + +Note if the has been more than once by the same thread, it will need to be unlocked the same +number of times. +""" +@inline function Base.unlock(l::SimpleFIFOLock) + ct = current_task() + lock(l.cond) + if l.locked_by === nothing + unlock(l.cond) + error("unlocking an unlocked lock") + end + if l.locked_by !== ct + unlock(l.cond) + error("unlock from wrong thread") + end + l.reentrancy_count += -1 + if l.reentrancy_count == 0 + l.locked_by = nothing + if !isempty(l.cond.waitq) + # Don't pay for the try-catch unless we `notify`. + try + notify(l.cond; all=false) + catch + unlock(l.cond) + rethrow() + end + end + end + unlock(l.cond) + return nothing +end + +# Performance note: for `@btime begin lock($l); unlock($l); end`. +# 13ns for SpinLock +# 17ns for ReentrantLock +# 33ns for FIFOLock +# 35ns for SimpleFIFOLock +# 57ns for the alternative versions below that use `@lock` (and hence have too much try-finally code) + +# @inline function Base.lock(l::SimpleFIFOLock) +# GC.disable_finalizers() +# ct = current_task() +# @lock l.cond begin +# while true +# if l.locked_by === nothing || l.locked_by === ct +# l.reentrancy_count += 1 +# l.locked_by = ct +# return nothing +# end +# wait(l.cond) +# end +# end +# end + +# @inline function Base.unlock(l::SimpleFIFOLock) +# ct = current_task() +# @lock l.cond begin +# if l.locked_by === nothing +# error("unlocking an unlocked lock") +# end +# if l.locked_by !== ct +# error("unlock from wrong thread") +# end +# @assert l.reentrancy_count > 0 +# l.reentrancy_count += -1 +# if l.reentrancy_count == 0 +# l.locked_by = nothing +# notify(l.cond; all=false) +# end +# end +# return nothing +# end diff --git a/test/runtests.jl b/test/runtests.jl index dba5f19..265e83f 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -269,6 +269,53 @@ else end # @static if VERSION < v"1.10" end + @testset "SimpleFIFOLock" begin +@static if false # VERSION < v"1.10-" + @warn "skipping FIFOLock tests since VERSION ($VERSION) < v\"1.10\"" +else + @warn "Doing SimpleFIFOLock" + ctr_in = Threads.Atomic{Int}(1) + ctr_out = Threads.Atomic{Int}(1) + test_tasks = Task[] + sizehint!(test_tasks, 16) + tasks_in = zeros(Int, 16) + tasks_out = zeros(Int, 16) + tot = zeros(Int, 1) + fl = SimpleFIFOLock() + lock(fl) + try + for i in 1:16 + t = Threads.@spawn begin + tasks_in[i] = Threads.atomic_add!(ctr_in, 1) + lock(fl) + try + tot[1] += 1 + tasks_out[i] = Threads.atomic_add!(ctr_out, 1) + finally + unlock(fl) + end + end + push!(test_tasks, t) + end + finally + unlock(fl) + end + println("Mid SimpleFIFOLock") + for t in test_tasks + @test try + wait(t) + true + catch + false + end + end + @test tot[1] == 16 + @test tasks_out == tasks_in + @warn "Did SimpleFIFOLock" +end # @static if VERSION < v"1.10" + end + + # track all workers every created ALL_WORKERS = [] ConcurrentUtilities.Workers.GLOBAL_CALLBACK_PER_WORKER[] = w -> push!(ALL_WORKERS, w)