Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ authors = ["Jacob Quinn <quinn.jacobd@gmail.com>"]
version = "2.5.0"

[deps]
BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"

Expand Down
3 changes: 2 additions & 1 deletion src/ConcurrentUtilities.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module ConcurrentUtilities
import Base: AbstractLock, islocked, trylock, lock, unlock
export Lockable, OrderedSynchronizer, reset!, ReadWriteLock, readlock, readunlock, @wkspawn,
Workers, remote_eval, remote_fetch, Worker, terminate!, WorkerTerminatedException,
Pool, acquire, release, drain!, try_with_timeout, TimeoutException, FIFOLock
Pool, acquire, release, drain!, try_with_timeout, TimeoutException, FIFOLock, SimpleFIFOLock

macro samethreadpool_spawn(expr)
if VERSION >= v"1.9.2"
Expand All @@ -23,6 +23,7 @@ include("rwlock.jl")
include("pools.jl")
using .Pools
include("fifolock.jl")
include("simplefifolock.jl")

function clear_current_task()
current_task().storage = nothing
Expand Down
152 changes: 152 additions & 0 deletions src/simplefifolock.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""
SimpleFIFOLock()

A reentrant lock similar to Base.ReentrantLock, but with strict FIFO ordering.

This lock supports "barging", in which a thread can jump to the front of the queue of tasks:
```
lock(fifolock; first=true)
```

Calling `lock` inhibits running on finalizers on that thread.

Implementation note: The implementation uses a Condition. Conditions provide FIFO
notification order, as well as the ability to jump to the front, which makes the
implementation straightforward.
"""
mutable struct SimpleFIFOLock <: AbstractLock
cond::Threads.Condition
reentrancy_count::UInt # 0 iff the lock is not held
locked_by::Union{Task,Nothing} # nothing iff the lock is not held
SimpleFIFOLock() = new(Threads.Condition(), 0, nothing)
end

"""
trylock(l::SimpleFIFOLock)

Try to acquire lock `l`. If successful, return `true`. If the lock is held by another
task, do not wait and return `false`.

Each successful `trylock` must be matched by an `unlock`.
"""
@inline function Base.trylock(l::SimpleFIFOLock)
GC.disable_finalizers()
ct = current_task()
lock(l.cond)
locked_by = l.locked_by
if locked_by === nothing || locked_by === ct
l.reentrancy_count += 1
l.locked_by = ct
unlock(l.cond)
return true
end
unlock(l.cond)
GC.enable_finalizers()
return false
end

"""
lock(l::SimpleFIFOLock; first=false)

Acquire lock `l`. The lock is reentrant, so if the calling task has already acquired the
lock then return immediately.

Each `lock` must be matched by an `unlock`.
"""
@inline function Base.lock(l::SimpleFIFOLock; first=false)
GC.disable_finalizers()
ct = current_task()
lock(l.cond)
while true
if l.locked_by === nothing || l.locked_by === ct
l.reentrancy_count += 1
l.locked_by = ct
unlock(l.cond)
return nothing
end
# Don't pay for the try-catch unless we `wait`.
try
wait(l.cond; first)
catch
unlock(l.cond)
rethrow()
end
end
end

"""
unlock(lock::SimpleFIFOLock)

Releases ownerhsip of `lock`.

Note if the has been more than once by the same thread, it will need to be unlocked the same
number of times.
"""
@inline function Base.unlock(l::SimpleFIFOLock)
ct = current_task()
lock(l.cond)
if l.locked_by === nothing
unlock(l.cond)
error("unlocking an unlocked lock")
end
if l.locked_by !== ct
unlock(l.cond)
error("unlock from wrong thread")
end
l.reentrancy_count += -1
if l.reentrancy_count == 0
l.locked_by = nothing
if !isempty(l.cond.waitq)
# Don't pay for the try-catch unless we `notify`.
try
notify(l.cond; all=false)
catch
unlock(l.cond)
rethrow()
end
end
end
unlock(l.cond)
return nothing
end

# Performance note: for `@btime begin lock($l); unlock($l); end`.
# 13ns for SpinLock
# 17ns for ReentrantLock
# 33ns for FIFOLock
# 35ns for SimpleFIFOLock
# 57ns for the alternative versions below that use `@lock` (and hence have too much try-finally code)

# @inline function Base.lock(l::SimpleFIFOLock)
# GC.disable_finalizers()
# ct = current_task()
# @lock l.cond begin
# while true
# if l.locked_by === nothing || l.locked_by === ct
# l.reentrancy_count += 1
# l.locked_by = ct
# return nothing
# end
# wait(l.cond)
# end
# end
# end

# @inline function Base.unlock(l::SimpleFIFOLock)
# ct = current_task()
# @lock l.cond begin
# if l.locked_by === nothing
# error("unlocking an unlocked lock")
# end
# if l.locked_by !== ct
# error("unlock from wrong thread")
# end
# @assert l.reentrancy_count > 0
# l.reentrancy_count += -1
# if l.reentrancy_count == 0
# l.locked_by = nothing
# notify(l.cond; all=false)
# end
# end
# return nothing
# end
47 changes: 47 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,53 @@ else
end # @static if VERSION < v"1.10"
end

@testset "SimpleFIFOLock" begin
@static if false # VERSION < v"1.10-"
@warn "skipping FIFOLock tests since VERSION ($VERSION) < v\"1.10\""
else
@warn "Doing SimpleFIFOLock"
ctr_in = Threads.Atomic{Int}(1)
ctr_out = Threads.Atomic{Int}(1)
test_tasks = Task[]
sizehint!(test_tasks, 16)
tasks_in = zeros(Int, 16)
tasks_out = zeros(Int, 16)
tot = zeros(Int, 1)
fl = SimpleFIFOLock()
lock(fl)
try
for i in 1:16
t = Threads.@spawn begin
tasks_in[i] = Threads.atomic_add!(ctr_in, 1)
lock(fl)
try
tot[1] += 1
tasks_out[i] = Threads.atomic_add!(ctr_out, 1)
finally
unlock(fl)
end
end
push!(test_tasks, t)
end
finally
unlock(fl)
end
println("Mid SimpleFIFOLock")
for t in test_tasks
@test try
wait(t)
true
catch
false
end
end
@test tot[1] == 16
@test tasks_out == tasks_in
@warn "Did SimpleFIFOLock"
end # @static if VERSION < v"1.10"
end


# track all workers every created
ALL_WORKERS = []
ConcurrentUtilities.Workers.GLOBAL_CALLBACK_PER_WORKER[] = w -> push!(ALL_WORKERS, w)
Expand Down