Skip to content

Commit 0eed815

Browse files
authored
improve issorted + support issorted, findfirst, findlast in byrow() (#29)
1 parent ec87bc1 commit 0eed815

File tree

6 files changed

+229
-26
lines changed

6 files changed

+229
-26
lines changed

docs/src/man/byrow.md

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,32 +44,36 @@ In the above benchmark, `byrow` should be even more performant when the data set
4444

4545
Generally, `byrow` is efficient for any `fun` which returns a single value for each row, however, it is fine tuned for the following functions:
4646

47-
* `all`
48-
* `any`
49-
* `argmax`
50-
* `argmin`
51-
* `coalesce`
52-
* `count`
53-
* `hash`
54-
* `isequal`
55-
* `maximum`
56-
* `mean`
57-
* `minimum`
58-
* `nunique`
59-
* `prod`
60-
* `std`
61-
* `sum`
62-
* `var`
63-
64-
The common syntax of `byrow` for all of these functions except `nunique`, `coalesce`, and `isequal` is:
47+
* `all` : Test whether all elements of a boolean collection are `true`
48+
* `any` : Test whether any elements of a boolean collection are `true`
49+
* `argmax` : Return the column name of the maximum element
50+
* `argmin` : Return the column name of the minimum element
51+
* `coalesce` : Return the first value which is not equal to `missing`
52+
* `count` : Count the number of `trues`
53+
* `findfirst` : Return the column name of the first true value
54+
* `findlast` : Return the column name of the last true value
55+
* `hash` : Compute an integer hash code
56+
* `isequal` : Return `true` when all values are equal
57+
* `issorted` : Check if the values are sorted
58+
* `maximum` : Return the maximum value
59+
* `mean` : Compute the mean value
60+
* `minimum` : Return the minimum value
61+
* `nunique` : Return the number of unique values
62+
* `prod` : Return the product of values
63+
* `std` : Compute the standard deviation of values
64+
* `sum` : Return the sum of values
65+
* `var` : Compute the variance of values
66+
67+
The common syntax of `byrow` for all of these functions except `nunique`, `coalesce`, `isequal`, and `issorted` is:
6568

6669
`byrow(ds, fun, cols; [by , threads = true])`
6770

6871
The `by` keyword argument is for specifying a function to call on each value before calling `fun` to aggregate the result, and `threads = true` causes `byrow` to exploit all cores available to Julia for performing the computations.
6972

7073
The `nunique` function doesn't accept `threads` argument, however, it has an extra keyword argument `count_missing`. `nunique` counts the number of unique values of each row, and `count_missing = true` counts missings as a unique value.
7174

72-
The `coalesce` and `isequal` functions don't accept `by` argument.
75+
The `coalesce`, `isequal`, and `issorted` functions don't accept `by` argument, however, `issorted` accepts extra keyword argument `rev` which is set to `false` by default.
76+
7377

7478
### Examples
7579

src/byrow/byrow.jl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ function byrow(ds::AbstractDataset, ::typeof(all), cols::MultiColumnIndex = :; b
4444
end
4545
byrow(ds::AbstractDataset, ::typeof(all), col::ColumnIndex; by = x->isequal(true, x), threads = nrow(ds)>1000) = byrow(ds, all, [col]; by = by, threads = threads)
4646

47+
byrow(ds::AbstractDataset, ::typeof(isequal), cols::MultiColumnIndex; threads = nrow(ds)>1000) = row_isequal(ds, cols, threads = threads)
48+
byrow(ds::AbstractDataset, ::typeof(findfirst), cols::MultiColumnIndex; by = identity, threads = nrow(ds)> 1000) = row_findfirst(ds, by, cols; threads = threads)
49+
byrow(ds::AbstractDataset, ::typeof(findlast), cols::MultiColumnIndex; by = identity, threads = nrow(ds)> 1000) = row_findlast(ds, by, cols; threads = threads)
50+
51+
4752
byrow(ds::AbstractDataset, ::typeof(coalesce), cols::MultiColumnIndex; threads = nrow(ds)>1000) = threads ? hp_row_coalesce(ds, cols) : row_coalesce(ds, cols)
4853

4954
byrow(ds::AbstractDataset, ::typeof(isequal), cols::MultiColumnIndex; threads = nrow(ds)>1000) = threads ? hp_row_isequal(ds, cols) : row_isequal(ds, cols)
@@ -103,6 +108,8 @@ byrow(ds::AbstractDataset, ::typeof(sort), col::ColumnIndex; threads = true, kwa
103108
byrow(ds::AbstractDataset, ::typeof(sort!), cols::MultiColumnIndex = names(ds, Union{Missing, Number}); threads = true, kwargs...) = threads ? hp_row_sort!(ds, cols; kwargs...) : row_sort!(ds, cols; kwargs...)
104109
byrow(ds::AbstractDataset, ::typeof(sort!), col::ColumnIndex; threads = true, kwargs...) = byrow(ds, sort!, [col]; threads = threads, kwargs...)
105110

111+
byrow(ds::AbstractDataset, ::typeof(issorted), cols::MultiColumnIndex; threads = nrow(ds)>1000, rev = false) = threads ? hp_row_issorted(ds, cols; rev = rev) : row_issorted(ds, cols; rev = rev)
112+
106113
byrow(ds::AbstractDataset, ::typeof(stdze), cols::MultiColumnIndex = names(ds, Union{Missing, Number})) = row_stdze(ds, cols)
107114

108115
byrow(ds::AbstractDataset, ::typeof(stdze!), cols::MultiColumnIndex = names(ds, Union{Missing, Number})) = row_stdze!(ds, cols)

src/byrow/hp_row_functions.jl

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,30 @@ function hp_row_sort(ds::AbstractDataset, cols = names(ds, Union{Missing, Number
257257
dscopy
258258
end
259259

260+
function hp_op_for_issorted!(x, y, res)
261+
Threads.@threads for i in 1:length(x)
262+
res[i] &= !isless(y[i], x[i])
263+
end
264+
y
265+
end
266+
function hp_op_for_issorted_rev!(x, y, res)
267+
Threads.@threads for i in 1:length(x)
268+
res[i] &= !isless(x[i], y[i])
269+
end
270+
y
271+
end
272+
273+
function hp_row_issorted(ds::AbstractDataset, cols; rev = false)
274+
colsidx = index(ds)[cols]
275+
init0 = ones(Bool, nrow(ds))
276+
if rev
277+
mapreduce(identity, (x, y)->hp_op_for_issorted_rev!(x, y, init0), view(_columns(ds),colsidx))
278+
else
279+
mapreduce(identity, (x, y)->hp_op_for_issorted!(x, y, init0), view(_columns(ds),colsidx))
280+
end
281+
init0
282+
end
283+
260284

261285
function hp_row_generic(ds::AbstractDataset, f::Function, cols::MultiColumnIndex)
262286
colsidx = index(ds)[cols]

src/byrow/row_functions.jl

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,96 @@ function row_all(ds::AbstractDataset, f::Function, cols = :)
6969
end
7070
row_all(ds::AbstractDataset, cols = :) = row_all(ds, isequal(true), cols)
7171

72+
function _op_for_isequal!(x,y, x1)
73+
x .&= isequal.(y, x1)
74+
x
75+
end
76+
function hp_op_for_isequal!(x,y, x1)
77+
Threads.@threads for i in 1:length(x)
78+
x[i] &= isequal(y[i], x1[i])
79+
end
80+
x
81+
end
82+
83+
function row_isequal(ds::AbstractDataset, cols = :; threads = true)
84+
colsidx = index(ds)[cols]
85+
init0 = ones(Bool, nrow(ds))
86+
length(colsidx) == 1 && return init0
87+
x1 = _columns(ds)[colsidx[1]]
88+
if threads
89+
mapreduce(identity, (x,y)->hp_op_for_isequal!(x,y,x1), view(_columns(ds),colsidx), init = init0)
90+
else
91+
mapreduce(identity, (x,y)->_op_for_isequal!(x,y,x1), view(_columns(ds),colsidx), init = init0)
92+
end
93+
end
94+
95+
96+
97+
98+
# TODO probably we should use this approach instead of mapreduce_indexed
99+
function _op_for_findfirst!(x, y, f, idx, missref)
100+
idx[] += 1
101+
x .= ifelse.(isequal.(missref, x) .& isequal.(true, f.(y)), idx, x)
102+
x
103+
end
104+
105+
function hp_op_for_findfirst!(x, y, f, idx, missref)
106+
idx[] += 1
107+
Threads.@threads for i in 1:length(x)
108+
x[i] = ifelse(isequal(missref, x[i]) & isequal(true, f(y[i])), idx[], x[i])
109+
end
110+
x
111+
end
112+
113+
function _op_for_findlast!(x, y, f, idx, missref)
114+
idx[] += 1
115+
x .= ifelse.(isequal.(true, f.(y)), idx, x)
116+
x
117+
end
118+
119+
function hp_op_for_findlast!(x, y, f, idx, missref)
120+
idx[] += 1
121+
Threads.@threads for i in 1:length(x)
122+
x[i] = ifelse(isequal(true, f(y[i])), idx[], x[i])
123+
end
124+
x
125+
end
126+
127+
# TODO probably we should use threads argument instead of seperate functions for hp version
128+
function row_findfirst(ds::AbstractDataset, f, cols = names(ds, Union{Missing, Number}); threads = true)
129+
colsidx = index(ds)[cols]
130+
idx = Ref{Int}(0)
131+
colnames_pa = allowmissing(PooledArray(names(ds, colsidx)))
132+
push!(colnames_pa, missing)
133+
missref = get(colnames_pa.invpool, missing, 0)
134+
init0 = fill(missref, nrow(ds))
135+
if threads
136+
mapreduce(identity, (x,y)->hp_op_for_findfirst!(x,y,f,idx, missref), view(_columns(ds),colsidx), init = init0)
137+
else
138+
mapreduce(identity, (x,y)->_op_for_findfirst!(x,y,f,idx, missref), view(_columns(ds),colsidx), init = init0)
139+
end
140+
colnames_pa.refs = init0
141+
colnames_pa
142+
end
143+
144+
function row_findlast(ds::AbstractDataset, f, cols = names(ds, Union{Missing, Number}); threads = true)
145+
colsidx = index(ds)[cols]
146+
idx = Ref{Int}(0)
147+
colnames_pa = allowmissing(PooledArray(names(ds, colsidx)))
148+
push!(colnames_pa, missing)
149+
missref = get(colnames_pa.invpool, missing, 0)
150+
init0 = fill(missref, nrow(ds))
151+
if threads
152+
mapreduce(identity, (x,y)->hp_op_for_findlast!(x,y,f,idx, missref), view(_columns(ds),colsidx), init = init0)
153+
else
154+
mapreduce(identity, (x,y)->_op_for_findlast!(x,y,f,idx, missref), view(_columns(ds),colsidx), init = init0)
155+
end
156+
colnames_pa.refs = init0
157+
colnames_pa
158+
end
159+
160+
161+
72162
function _op_for_coalesce!(x, y)
73163
if all(!ismissing, x)
74164
x
@@ -399,6 +489,26 @@ function row_sort(ds::AbstractDataset, cols = names(ds, Union{Missing, Number});
399489
dscopy
400490
end
401491

492+
function _op_for_issorted!(x, y, res)
493+
res .&= .!isless.(y, x)
494+
y
495+
end
496+
function _op_for_issorted_rev!(x, y, res)
497+
res .&= .!isless.(x, y)
498+
y
499+
end
500+
501+
function row_issorted(ds::AbstractDataset, cols; rev = false)
502+
colsidx = index(ds)[cols]
503+
init0 = ones(Bool, nrow(ds))
504+
if rev
505+
mapreduce(identity, (x, y)->_op_for_issorted_rev!(x, y, init0), view(_columns(ds),colsidx))
506+
else
507+
mapreduce(identity, (x, y)->_op_for_issorted!(x, y, init0), view(_columns(ds),colsidx))
508+
end
509+
init0
510+
end
511+
402512
# TODO is it possible to have a faster row_count_unique??
403513
function _fill_prehashed!(prehashed, y, f, n, j)
404514
@views copy!(prehashed[:, j] , _Prehashed.(hash.(f.(y))))

src/sort/sort.jl

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,9 @@ function _issorted(ds, cols::MultiColumnIndex, ::Val{T}; rev = false, mapformats
182182
inbits[1] = true
183183
for j in 1:length(colsidx)
184184
v = _columns(ds)[colsidx[j]]
185-
for rng in 1:lastvalid
186-
lo = starts[rng]
187-
rng == lastvalid ? hi = nrow(ds) : hi = starts[rng+1] - 1
188-
part_res = _issorted_barrier(v, Base.Order.ord(isless, by[j], revs[j]), lo, hi)
189-
!part_res && return false, starts, lastvalid, colsidx, revs, mapformats
190-
end
185+
_ord = Base.Order.ord(isless, by[j], revs[j])
186+
part_res = _issorted_check_for_each_range(v, starts, lastvalid, _ord, nrow(ds))
187+
!part_res && return false, starts, lastvalid, colsidx, revs, mapformats
191188
_find_starts_of_groups!(_columns(ds)[colsidx[j]], 1:nrow(ds), by[j], inbits)
192189
lastvalid = _fill_starts_from_inbits!(starts, inbits)
193190
lastvalid == nrow(ds) && return true, starts, lastvalid, colsidx, revs, mapformats
@@ -196,6 +193,17 @@ function _issorted(ds, cols::MultiColumnIndex, ::Val{T}; rev = false, mapformats
196193
res, starts, lastvalid, colsidx, revs, mapformats
197194
end
198195

196+
function _issorted_check_for_each_range(v, starts, lastvalid, _ord, nrows)
197+
part_res = ones(Bool, Threads.nthreads())
198+
Threads.@threads for rng in 1:lastvalid
199+
lo = starts[rng]
200+
rng == lastvalid ? hi = nrows : hi = starts[rng+1] - 1
201+
part_res[Threads.threadid()] = _issorted_barrier(v, _ord, lo, hi)
202+
!part_res[Threads.threadid()] && break
203+
end
204+
all(part_res)
205+
end
206+
199207
function _fill_starts_from_inbits!(starts, inbits)
200208
lastvalid = 1
201209
@inbounds for i in 1:length(inbits)
@@ -209,7 +217,7 @@ end
209217

210218
function _issorted_barrier(v, _ord, lo, hi)
211219
lo >= hi && return true
212-
for i in lo+1:hi
220+
@inbounds for i in lo+1:hi
213221
Base.Order.lt(_ord, v[i], v[i-1]) && return false
214222
end
215223
true

test/byrow.jl

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,56 @@
4444
sds = view(ds, [1,2,2,1,3,4,5,5,5], [2,1])
4545
@test byrow(sds, isequal, :, threads = true) == [0,1,1,0,0,0, 1,1,1]
4646
@test byrow(sds, isequal, [1], threads = true) == ones(9)
47+
48+
ds = Dataset(x1 = [1,2,3,4,missing], x2 = [3,2,4,5, missing])
49+
@test byrow(ds, issorted, :) == [true, true, true, true, true]
50+
@test byrow(ds, issorted, :, rev = true) == [false, true, false, false, true]
51+
52+
ds = Dataset(randn(10000, 3), :auto)
53+
map!(ds, x->rand()<.1 ? missing : x, :)
54+
dsm = Matrix(ds)
55+
@test byrow(ds, issorted, :) == issorted.(eachrow(dsm))
56+
@test byrow(ds, issorted, :, rev = true) == issorted.(eachrow(dsm), rev = true)
57+
insertcols!(ds, 1, :y=>rand(-1:1, nrow(ds)))
58+
dsm = Matrix(ds)
59+
@test byrow(ds, issorted, :) == byrow(ds, issorted, :, threads = false) == issorted.(eachrow(dsm))
60+
@test byrow(ds, issorted, :, rev = true) == byrow(ds, issorted, :, rev = true, threads = false) == issorted.(eachrow(dsm), rev = true)
61+
62+
ds = Dataset(g = [1, 1, 1, 2, 2],
63+
x1_int = [0, 0, 1, missing, 2],
64+
x2_int = [3, 2, 1, 3, -2],
65+
x1_float = [1.2, missing, -1.0, 2.3, 10],
66+
x2_float = [missing, missing, 3.0, missing, missing],
67+
x3_float = [missing, missing, -1.4, 3.0, -100.0])
68+
@test isequal(byrow(ds, findfirst, :, by = ismissing), ["x2_float", "x1_float", missing, "x1_int", "x2_float"])
69+
@test isequal(byrow(ds, findlast, :, by = ismissing), ["x3_float", "x3_float", missing, "x2_float", "x2_float"])
70+
@test isequal(byrow(ds, findfirst, :, by = x->isless(x,0)), [missing, missing, "x1_float", missing, "x2_int"])
71+
@test isequal(byrow(ds, findlast, :, by = x->isless(x,0)), [missing, missing, "x3_float", missing, "x3_float"])
72+
@test isequal(byrow(ds, findfirst, :, by = x->1), ["g","g","g", "g","g"])
73+
@test isequal(byrow(ds, findfirst, :), ["g","g","g", missing, missing])
74+
@test isequal(byrow(ds, findlast, :), ["g","g","x2_int", missing, missing])
75+
@test isequal(byrow(ds, findfirst, [3,2,1], by = isequal(2)) ,byrow(ds, findlast, 1:3, by = isequal(2)))
76+
@test isequal(byrow(ds, findfirst, 1:3, by = isequal(2)) ,byrow(ds, findlast, [3,2,1], by = isequal(2)))
77+
78+
79+
sds = view(ds, rand(1:5, 100), [2,1,6,5,3,4])
80+
@test isequal(byrow(sds, findfirst,:, by = x->isless(x,0)), byrow(Dataset(sds), findfirst, :, by = x->isless(x,0)))
81+
@test isequal(byrow(sds, findlast,:, by = x->isless(x,0)), byrow(Dataset(sds), findlast, :, by = x->isless(x,0)))
82+
@test isequal(byrow(sds, findfirst,:, by = x->isless(x,0), threads = true), byrow(Dataset(sds), findfirst, :, by = x->isless(x,0)))
83+
@test isequal(byrow(sds, findlast,:, by = x->isless(x,0), threads = true), byrow(Dataset(sds), findlast, :, by = x->isless(x,0)))
84+
sds = view(ds, rand(1:5, 100), [2,1,6,5,3,4])
85+
@test isequal(byrow(sds, findfirst,:, by = x->isless(x,0)), byrow(Dataset(sds), findfirst, :, by = x->isless(x,0)))
86+
@test isequal(byrow(sds, findlast,:, by = x->isless(x,0)), byrow(Dataset(sds), findlast, :, by = x->isless(x,0)))
87+
@test isequal(byrow(sds, findfirst,:, by = x->isless(x,0), threads = true), byrow(Dataset(sds), findfirst, :, by = x->isless(x,0)))
88+
@test isequal(byrow(sds, findlast,:, by = x->isless(x,0), threads = true), byrow(Dataset(sds), findlast, :, by = x->isless(x,0)))
89+
90+
sds = view(ds, rand(1:5, 100), [2,1,3,4])
91+
@test isequal(byrow(sds, findfirst,[1,4,3,2], by = x->isless(x,0)), byrow(Dataset(sds), findfirst, [1,4,3,2], by = x->isless(x,0)))
92+
@test isequal(byrow(sds, findlast,[1,4,3,2], by = x->isless(x,0)), byrow(Dataset(sds), findlast, [1,4,3,2], by = x->isless(x,0)))
93+
@test isequal(byrow(sds, findfirst,[1,4,3,2], by = x->isless(x,0), threads = true), byrow(Dataset(sds), findfirst, [1,4,3,2], by = x->isless(x,0)))
94+
@test isequal(byrow(sds, findlast,[1,4,3,2], by = x->isless(x,0), threads = true), byrow(Dataset(sds), findlast, [1,4,3,2], by = x->isless(x,0)))
95+
96+
4797
end
4898

4999
@testset "cum*/!" begin

0 commit comments

Comments
 (0)