Skip to content

Commit 19a674a

Browse files
committed
Add UDF parameter / returns transformers
1 parent 6ed4c46 commit 19a674a

File tree

15 files changed

+780
-418
lines changed

15 files changed

+780
-418
lines changed

accel.c

Lines changed: 199 additions & 98 deletions
Large diffs are not rendered by default.

singlestoredb/functions/decorator.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010

1111
from . import utils
1212
from .dtypes import SQLString
13-
from .typing import JSON
14-
from .typing import JSONArray
1513

1614

1715
ParameterType = Union[
@@ -75,8 +73,6 @@ def expand_types(args: Any) -> Optional[List[Any]]:
7573
for arg in args:
7674
if isinstance(arg, str):
7775
new_args.append(arg)
78-
elif arg in [JSON, JSONArray]:
79-
new_args.append(arg)
8076
elif is_sqlstr_callable(arg):
8177
new_args.append(arg())
8278
elif type(arg) is type:

singlestoredb/functions/ext/asgi.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ def as_list_of_tuples(x: Any) -> Any:
194194
return x
195195

196196

197+
def transpose(data: Sequence[Sequence[Any]]) -> List[Tuple[Any]]:
198+
return [tuple(row) for row in zip(*data)]
199+
200+
197201
def get_dataframe_columns(df: Any) -> List[Any]:
198202
"""Return columns of data from a dataframe/table."""
199203
if isinstance(df, Table):
@@ -209,16 +213,25 @@ def get_dataframe_columns(df: Any) -> List[Any]:
209213
return list(df)
210214

211215
rtype = str(type(df)).lower()
216+
217+
# Pandas or polars type of dataframe
212218
if 'dataframe' in rtype:
213219
return [df[x] for x in df.columns]
220+
# PyArrow table
214221
elif 'table' in rtype:
215222
return df.columns
223+
# Pandas or polars series
216224
elif 'series' in rtype:
217225
return [df]
226+
# Numpy array
218227
elif 'array' in rtype:
219228
return [df]
229+
# List of objects
230+
elif 'list' in rtype:
231+
return transpose(as_list_of_tuples(df))
232+
# Tuple of objects
220233
elif 'tuple' in rtype:
221-
return list(df)
234+
return transpose(as_list_of_tuples(df))
222235

223236
raise TypeError(
224237
'Unsupported data type for dataframe columns: '
@@ -316,7 +329,7 @@ def build_udf_endpoint(
316329
The function endpoint
317330
318331
"""
319-
if returns_data_format in ['scalar', 'list']:
332+
if returns_data_format in ['scalar']:
320333

321334
is_async = asyncio.iscoroutinefunction(func)
322335

@@ -427,7 +440,7 @@ def build_tvf_endpoint(
427440
The function endpoint
428441
429442
"""
430-
if returns_data_format in ['scalar', 'list']:
443+
if returns_data_format in ['scalar']:
431444

432445
is_async = asyncio.iscoroutinefunction(func)
433446

@@ -768,8 +781,8 @@ class Application(object):
768781
response=rowdat_1_response_dict,
769782
),
770783
(b'application/octet-stream', b'1.0', 'list'): dict(
771-
load=rowdat_1.load,
772-
dump=rowdat_1.dump,
784+
load=rowdat_1.load_list,
785+
dump=rowdat_1.dump_list,
773786
response=rowdat_1_response_dict,
774787
),
775788
(b'application/octet-stream', b'1.0', 'pandas'): dict(
@@ -798,8 +811,8 @@ class Application(object):
798811
response=json_response_dict,
799812
),
800813
(b'application/json', b'1.0', 'list'): dict(
801-
load=jdata.load,
802-
dump=jdata.dump,
814+
load=jdata.load_list,
815+
dump=jdata.dump_list,
803816
response=json_response_dict,
804817
),
805818
(b'application/json', b'1.0', 'pandas'): dict(

singlestoredb/functions/ext/json.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ def load_arrow(
262262

263263

264264
def dump(
265-
returns: List[Tuple[int, Optional[Transformer]]],
265+
returns: List[Tuple[str, int, Optional[Transformer]]],
266266
row_ids: List[int],
267267
rows: List[List[Any]],
268268
) -> bytes:
@@ -271,7 +271,7 @@ def dump(
271271
272272
Parameters
273273
----------
274-
returns : List[Tuple[int, Optional[Transformer]]]
274+
returns : List[Tuple[str, int, Optional[Transformer]]]
275275
The returned data type
276276
row_ids : List[int]
277277
Row IDs
@@ -284,16 +284,19 @@ def dump(
284284
285285
'''
286286
rows = list(rows)
287-
for i, (_, transformer) in enumerate(returns):
287+
transformers = []
288+
for i, (_, _, transformer) in enumerate(returns):
288289
if transformer is not None:
289-
for row in rows:
290-
row[i] = apply_transformer(transformer, row[i])
290+
transformers.append((i, transformer))
291+
for (i, transformer) in transformers:
292+
for row in rows:
293+
row[i] = apply_transformer(transformer, row[i])
291294
data = list(zip(row_ids, *list(zip(*rows))))
292295
return json.dumps(dict(data=data), cls=JSONEncoder).encode('utf-8')
293296

294297

295298
def _dump_vectors(
296-
returns: List[Tuple[int, Optional[Transformer]]],
299+
returns: List[Tuple[str, int, Optional[Transformer]]],
297300
row_ids: List[int],
298301
cols: List[Tuple[Any, Any]],
299302
) -> bytes:
@@ -319,13 +322,13 @@ def _dump_vectors(
319322
if mask is not None:
320323
masked_cols.append([
321324
apply_transformer(
322-
returns[i][1], d,
325+
returns[i][2], d,
323326
) if m is not None else None for d, m in zip(data, mask)
324327
])
325328
else:
326329
masked_cols.append(
327330
apply_transformer(
328-
returns[i][1],
331+
returns[i][2],
329332
cols[i][0],
330333
),
331334
)
@@ -338,7 +341,7 @@ def _dump_vectors(
338341

339342

340343
def dump_pandas(
341-
returns: List[Tuple[int, Optional[Transformer]]],
344+
returns: List[Tuple[str, int, Optional[Transformer]]],
342345
row_ids: 'pd.Series[int]',
343346
cols: List[Tuple['pd.Series[int]', 'pd.Series[bool]']],
344347
) -> bytes:
@@ -347,7 +350,7 @@ def dump_pandas(
347350
348351
Parameters
349352
----------
350-
returns : List[Tuple[int, Optional[Transformer]]]
353+
returns : List[Tuple[str, int, Optional[Transformer]]]
351354
The returned data type
352355
row_ids : pd.Series[int]
353356
Row IDs
@@ -363,7 +366,7 @@ def dump_pandas(
363366

364367
row_ids.index = row_ids
365368

366-
for i, ((data, mask), (dtype, transformer)) in enumerate(zip(cols, returns)):
369+
for i, ((data, mask), (_, dtype, transformer)) in enumerate(zip(cols, returns)):
367370
data.index = row_ids.index
368371
if mask is not None:
369372
mask.index = row_ids.index
@@ -387,7 +390,7 @@ def dump_pandas(
387390

388391

389392
def dump_polars(
390-
returns: List[Tuple[int, Optional[Transformer]]],
393+
returns: List[Tuple[str, int, Optional[Transformer]]],
391394
row_ids: 'pl.Series[int]',
392395
cols: List[Tuple['pl.Series[Any]', 'pl.Series[int]']],
393396
) -> bytes:
@@ -396,7 +399,7 @@ def dump_polars(
396399
397400
Parameters
398401
----------
399-
returns : List[Tuple[int, Optional[Transformer]]]
402+
returns : List[Tuple[str, int, Optional[Transformer]]]
400403
The returned data type
401404
row_ids : List[int]
402405
cols : List[Tuple[polars.Series[Any], polars.Series[bool]]
@@ -415,7 +418,7 @@ def dump_polars(
415418

416419

417420
def dump_numpy(
418-
returns: List[Tuple[int, Optional[Transformer]]],
421+
returns: List[Tuple[str, int, Optional[Transformer]]],
419422
row_ids: 'np.typing.NDArray[np.int64]',
420423
cols: List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']],
421424
) -> bytes:
@@ -424,7 +427,7 @@ def dump_numpy(
424427
425428
Parameters
426429
----------
427-
returns : List[Tuple[int, Optional[Transformer]]]
430+
returns : List[Tuple[str, int, Optional[Transformer]]]
428431
The returned data type
429432
row_ids : List[int]
430433
Row IDs
@@ -444,7 +447,7 @@ def dump_numpy(
444447

445448

446449
def dump_arrow(
447-
returns: List[Tuple[int, Optional[Transformer]]],
450+
returns: List[Tuple[str, int, Optional[Transformer]]],
448451
row_ids: 'pa.Array[int]',
449452
cols: List[Tuple['pa.Array[int]', 'pa.Array[bool]']],
450453
) -> bytes:
@@ -453,7 +456,7 @@ def dump_arrow(
453456
454457
Parameters
455458
----------
456-
returns : List[Tuple[int, Optional[Transformer]]]
459+
returns : List[Tuple[str, int, Optional[Transformer]]]
457460
The returned data type
458461
row_ids : pyarrow.Array[int]
459462
Row IDs

singlestoredb/functions/ext/rowdat_1.py

Lines changed: 18 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -565,17 +565,16 @@ def _load_numpy_accel(
565565
if not has_accel:
566566
raise RuntimeError('could not load SingleStoreDB extension')
567567

568-
row_ids, cols = _singlestoredb_accel.load_rowdat_1_numpy(colspec, data)
568+
import numpy as np
569569

570-
cols = list(cols)
570+
numpy_ids, numpy_cols = _singlestoredb_accel.load_rowdat_1_numpy(colspec, data)
571571

572-
for i, ((name, dtype, transformer), col) in enumerate(zip(colspec, cols)):
572+
for i, (_, dtype, transformer) in enumerate(colspec):
573573
if transformer is not None:
574-
import numpy as np
575-
vectorized_transformer = np.vectorize(transformer)
576-
cols[i] = (vectorized_transformer(col[0]), col[1])
574+
t = np.vectorize(transformer)
575+
numpy_cols[i] = (t(numpy_cols[i][0]), numpy_cols[i][1])
577576

578-
return row_ids, cols
577+
return numpy_ids, numpy_cols
579578

580579

581580
def _dump_numpy_accel(
@@ -586,17 +585,14 @@ def _dump_numpy_accel(
586585
if not has_accel:
587586
raise RuntimeError('could not load SingleStoreDB extension')
588587

589-
cols = list(cols)
588+
import numpy as np
590589

591-
for i, (_, rtype, transformer), (data, mask) in zip(range(len(cols)), returns, cols):
590+
for i, (_, dtype, transformer) in enumerate(returns):
592591
if transformer is not None:
593-
import numpy as np
594-
vectorized_transformer = np.vectorize(transformer)
595-
cols[i] = (vectorized_transformer(data), mask)
592+
t = np.vectorize(transformer)
593+
cols[i] = (t(cols[i][0]), cols[i][1])
596594

597-
return _singlestoredb_accel.dump_rowdat_1_numpy(
598-
[x[1] for x in returns], row_ids, cols,
599-
)
595+
return _singlestoredb_accel.dump_rowdat_1_numpy(returns, row_ids, cols)
600596

601597

602598
def _load_pandas_accel(
@@ -642,16 +638,7 @@ def _dump_pandas_accel(
642638
for data, mask in cols
643639
]
644640

645-
for i, (_, rtype, transformer), (data, mask)\
646-
in zip(range(len(numpy_cols)), returns, numpy_cols):
647-
if transformer is not None:
648-
import numpy as np
649-
vectorized_transformer = np.vectorize(transformer, otypes=[object])
650-
numpy_cols[i] = (vectorized_transformer(data), mask)
651-
652-
return _singlestoredb_accel.dump_rowdat_1_numpy(
653-
[x[1] for x in returns], numpy_ids, numpy_cols,
654-
)
641+
return _dump_numpy_accel(returns, numpy_ids, numpy_cols)
655642

656643

657644
def _load_polars_accel(
@@ -700,16 +687,7 @@ def _dump_polars_accel(
700687
for data, mask in cols
701688
]
702689

703-
for i, (_, rtype, transformer), (data, mask)\
704-
in zip(range(len(numpy_cols)), returns, numpy_cols):
705-
if transformer is not None:
706-
import numpy as np
707-
vectorized_transformer = np.vectorize(transformer, otypes=[object])
708-
numpy_cols[i] = (vectorized_transformer(data), mask)
709-
710-
return _singlestoredb_accel.dump_rowdat_1_numpy(
711-
[x[1] for x in returns], numpy_ids, numpy_cols,
712-
)
690+
return _dump_numpy_accel(returns, numpy_ids, numpy_cols)
713691

714692

715693
def _load_arrow_accel(
@@ -763,49 +741,24 @@ def _dump_arrow_accel(
763741
for (data, mask), (_, dtype, _) in zip(cols, returns)
764742
]
765743

766-
for i, (_, rtype, transformer), (data, mask) in zip(
767-
range(len(numpy_cols)), returns,
768-
numpy_cols,
769-
):
770-
if transformer is not None:
771-
import numpy as np
772-
vectorized_transformer = np.vectorize(transformer, otypes=[object])
773-
numpy_cols[i] = (vectorized_transformer(data), mask)
774-
775-
return _singlestoredb_accel.dump_rowdat_1_numpy(
776-
[x[1] for x in returns], row_ids.to_numpy(), numpy_cols,
777-
)
744+
return _dump_numpy_accel(returns, row_ids.to_numpy(), numpy_cols)
778745

779746

780747
def _dump_rowdat_1_accel(
781748
returns: List[Tuple[str, int, Optional[Transformer]]],
782749
row_ids: List[int],
783750
rows: List[List[Any]],
784751
) -> bytes:
785-
rows = list(rows)
786-
787-
for i, (name, dtype, transformer), row in zip(range(len(returns)), returns, rows):
788-
if transformer is not None:
789-
for j in range(len(row)):
790-
rows[j][i] = transformer(row[i])
791-
792-
return _singlestoredb_accel.dump_rowdat_1(
793-
[x[1] for x in returns], row_ids, rows,
794-
)
752+
# C function now handles transformers internally
753+
return _singlestoredb_accel.dump_rowdat_1(returns, row_ids, rows)
795754

796755

797756
def _load_rowdat_1_accel(
798757
colspec: List[Tuple[str, int, Optional[Transformer]]],
799758
data: bytes,
800759
) -> Tuple[List[int], List[Any]]:
801-
row_ids, rows = _singlestoredb_accel.load_rowdat_1(colspec, data)
802-
803-
for i, (name, dtype, transformer), _ in zip(range(len(colspec)), colspec, rows):
804-
if transformer is not None:
805-
for j in range(len(rows)):
806-
rows[j][i] = transformer(rows[j][i])
807-
808-
return row_ids, rows
760+
# C function now handles transformers internally
761+
return _singlestoredb_accel.load_rowdat_1(colspec, data)
809762

810763

811764
if not has_accel:

0 commit comments

Comments
 (0)