@@ -205,37 +205,62 @@ function row_maximum(ds::AbstractDataset, f::Function, cols = names(ds, Union{Mi
205205end
206206row_maximum (ds:: AbstractDataset , cols = names (ds, Union{Missing, Number})) = row_maximum (ds, identity, cols)
207207
208- function _op_for_argminmax! (x, y, f, vals, idx)
208+ function _op_for_argminmax! (x, y, f, vals, idx, missref )
209209 idx[] += 1
210210 for i in 1 : length (x)
211- if isequal (vals[i], f (y[i])) && ismissing (x[i])
211+ if ! ismissing (vals[i]) && isequal (vals[i], f (y[i])) && isequal (x[i], missref )
212212 x[i] = idx[]
213213 end
214214 end
215215 x
216216end
217+ function hp_op_for_argminmax! (x, y, f, vals, idx, missref)
218+ idx[] += 1
219+ Threads. @threads for i in 1 : length (x)
220+ if ! ismissing (vals[i]) && isequal (vals[i], f (y[i])) && isequal (x[i], missref)
221+ x[i] = idx[]
222+ end
223+ end
224+ x
225+ end
226+
217227
218- function row_argmin (ds:: AbstractDataset , f:: Function , cols = names (ds, Union{Missing, Number}))
228+ function row_argmin (ds:: AbstractDataset , f:: Function , cols = names (ds, Union{Missing, Number}); threads = true )
219229 colsidx = index (ds)[cols]
220230 minvals = row_minimum (ds, f, cols)
221- colnames_pa = PooledArray (names (ds, colsidx))
231+ colnames_pa = allowmissing (PooledArray (names (ds, colsidx)))
232+ push! (colnames_pa, missing )
233+ missref = get (colnames_pa. invpool, missing , missing )
234+ init0 = fill (missref, nrow (ds))
222235 idx = Ref {Int} (0 )
223- res = mapreduce (identity, (x,y)-> _op_for_argminmax! (x,y, f, minvals, idx), view (_columns (ds),colsidx), init = missings (eltype (colnames_pa. refs), nrow (ds)))
236+ if threads
237+ res = mapreduce (identity, (x,y)-> hp_op_for_argminmax! (x,y, f, minvals, idx, missref), view (_columns (ds),colsidx), init = init0)
238+
239+ else
240+ res = mapreduce (identity, (x,y)-> _op_for_argminmax! (x,y, f, minvals, idx, missref), view (_columns (ds),colsidx), init = init0)
241+ end
224242 colnames_pa. refs = res
225243 colnames_pa
226244end
227- row_argmin (ds:: AbstractDataset , cols = names (ds, Union{Missing, Number})) = row_argmin (ds, identity, cols)
245+ row_argmin (ds:: AbstractDataset , cols = names (ds, Union{Missing, Number}); threads = true ) = row_argmin (ds, identity, cols, threads = threads )
228246
229- function row_argmax (ds:: AbstractDataset , f:: Function , cols = names (ds, Union{Missing, Number}))
247+ function row_argmax (ds:: AbstractDataset , f:: Function , cols = names (ds, Union{Missing, Number}); threads = true )
230248 colsidx = index (ds)[cols]
231249 maxvals = row_maximum (ds, f, cols)
232- colnames_pa = PooledArray (names (ds, colsidx))
250+ colnames_pa = allowmissing (PooledArray (names (ds, colsidx)))
251+ push! (colnames_pa, missing )
252+ missref = get (colnames_pa. invpool, missing , missing )
253+ init0 = fill (missref, nrow (ds))
233254 idx = Ref {Int} (0 )
234- res = mapreduce (identity, (x,y)-> _op_for_argminmax! (x,y,f, maxvals, idx), view (_columns (ds),colsidx), init = missings (eltype (colnames_pa. refs), nrow (ds)))
255+ if threads
256+ res = mapreduce (identity, (x,y)-> hp_op_for_argminmax! (x,y,f, maxvals, idx, missref), view (_columns (ds),colsidx), init = init0)
257+ else
258+ res = mapreduce (identity, (x,y)-> _op_for_argminmax! (x,y,f, maxvals, idx, missref), view (_columns (ds),colsidx), init = init0)
259+ end
235260 colnames_pa. refs = res
236261 colnames_pa
237262end
238- row_argmax (ds:: AbstractDataset , cols = names (ds, Union{Missing, Number})) = row_argmax (ds, identity, cols)
263+ row_argmax (ds:: AbstractDataset , cols = names (ds, Union{Missing, Number}); threads = true ) = row_argmax (ds, identity, cols, threads = threads )
239264
240265
241266# TODO better function for the first component of operator
0 commit comments