Skip to content

Commit 44dcc3f

Browse files
authored
Merge branch 'master' into repartition-reader-single-cf
2 parents 3f9d88a + dce992b commit 44dcc3f

File tree

44 files changed

+1787
-558
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1787
-558
lines changed

common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,7 @@ public enum LogKeys implements LogKey {
787787
STREAM_CHUNK_ID,
788788
STREAM_ID,
789789
STREAM_NAME,
790+
STREAM_SHOULD_FORCE_SNAPSHOT,
790791
SUBMISSION_ID,
791792
SUBSAMPLING_RATE,
792793
SUB_QUERY,

common/utils/src/main/scala/org/apache/spark/util/SparkSystemUtils.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ private[spark] trait SparkSystemUtils {
3939
*/
4040
val javaVersion = JavaUtils.javaVersion
4141

42+
/**
43+
* Whether the underlying Java version is at most 17.
44+
*/
45+
val isJavaVersionAtMost17 = Runtime.version().feature() <= 17
46+
47+
/**
48+
* Whether the underlying Java version is at least 21.
49+
*/
50+
val isJavaVersionAtLeast21 = Runtime.version().feature() >= 21
51+
4252
/**
4353
* Whether the underlying operating system is Windows.
4454
*/
@@ -63,6 +73,7 @@ private[spark] trait SparkSystemUtils {
6373
* Whether the underlying operating system is UNIX.
6474
*/
6575
val isUnix = JavaUtils.isUnix
76+
6677
}
6778

6879
object SparkSystemUtils extends SparkSystemUtils

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleDatabaseOnDocker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ class OracleDatabaseOnDocker extends DatabaseOnDocker with Logging {
2323
// sarutak/oracle-free is a custom fork of gvenzl/oracle-free which allows to set timeout for
2424
// password initialization. See SPARK-54076 for details.
2525
lazy override val imageName =
26-
sys.env.getOrElse("ORACLE_DOCKER_IMAGE_NAME", "sarutak/oracle-free:23.9-slim")
26+
sys.env.getOrElse("ORACLE_DOCKER_IMAGE_NAME", "sarutak/oracle-free:23.26.0-slim")
2727
val oracle_password = "Th1s1sThe0racle#Pass"
2828
override val env = Map(
2929
"ORACLE_PWD" -> oracle_password, // oracle images uses this
3030
"ORACLE_PASSWORD" -> oracle_password, // gvenzl/oracle-free uses this
31-
"PASSWORD_INIT_TIMEOUT" -> "30"
31+
"PASSWORD_INIT_TIMEOUT" -> "60"
3232
)
3333
override val usesIpc = false
3434
override val jdbcPort: Int = 1521

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1855,16 +1855,6 @@ private[spark] object Utils
18551855
getHadoopFileSystem(new URI(path), conf)
18561856
}
18571857

1858-
/**
1859-
* Whether the underlying Java version is at most 17.
1860-
*/
1861-
val isJavaVersionAtMost17 = Runtime.version().feature() <= 17
1862-
1863-
/**
1864-
* Whether the underlying Java version is at least 21.
1865-
*/
1866-
val isJavaVersionAtLeast21 = Runtime.version().feature() >= 21
1867-
18681858
/**
18691859
* Whether the underlying JVM prefer IPv6 addresses.
18701860
*/

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@
220220
<netty.version>4.2.7.Final</netty.version>
221221
<netty-tcnative.version>2.0.74.Final</netty-tcnative.version>
222222
<icu4j.version>77.1</icu4j.version>
223-
<junit.version>6.0.0</junit.version>
223+
<junit.version>6.0.1</junit.version>
224224
<jline.version>2.14.6</jline.version>
225225
<!--
226226
SPARK-50299: When updating `sbt-jupiter-interface.version`,

python/pyspark/sql/connect/client/core.py

Lines changed: 14 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,9 @@
9999
)
100100
from pyspark.sql.connect.observation import Observation
101101
from pyspark.sql.connect.utils import get_python_ver
102-
from pyspark.sql.pandas.types import _create_converter_to_pandas, from_arrow_schema
103-
from pyspark.sql.types import DataType, StructType, _has_type
102+
from pyspark.sql.pandas.types import from_arrow_schema
103+
from pyspark.sql.pandas.conversion import _convert_arrow_table_to_pandas
104+
from pyspark.sql.types import DataType, StructType
104105
from pyspark.util import PythonEvalType
105106
from pyspark.storagelevel import StorageLevel
106107
from pyspark.errors import PySparkValueError, PySparkAssertionError, PySparkNotImplementedError
@@ -987,88 +988,31 @@ def to_pandas(
987988
# Get all related configs in a batch
988989
(
989990
timezone,
990-
struct_in_pandas,
991-
self_destruct,
991+
structHandlingMode,
992+
selfDestruct,
992993
) = self.get_configs(
993994
"spark.sql.session.timeZone",
994995
"spark.sql.execution.pandas.structHandlingMode",
995996
"spark.sql.execution.arrow.pyspark.selfDestruct.enabled",
996997
)
997998

998999
table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(
999-
req, observations, self_destruct == "true"
1000+
req, observations, selfDestruct == "true"
10001001
)
10011002
assert table is not None
10021003
ei = ExecutionInfo(metrics, observed_metrics)
10031004

10041005
schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True)
10051006
assert schema is not None and isinstance(schema, StructType)
10061007

1007-
# Rename columns to avoid duplicated column names during processing
1008-
temp_col_names = [f"col_{i}" for i in range(len(schema.names))]
1009-
table = table.rename_columns(temp_col_names)
1010-
1011-
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
1012-
# values, but we should use datetime.date to match the behavior with when
1013-
# Arrow optimization is disabled.
1014-
pandas_options = {"coerce_temporal_nanoseconds": True}
1015-
if self_destruct == "true" and table.num_rows > 0:
1016-
# Configure PyArrow to use as little memory as possible:
1017-
# self_destruct - free columns as they are converted
1018-
# split_blocks - create a separate Pandas block for each column
1019-
# use_threads - convert one column at a time
1020-
pandas_options.update(
1021-
{
1022-
"self_destruct": True,
1023-
"split_blocks": True,
1024-
"use_threads": False,
1025-
}
1026-
)
1027-
1028-
if len(schema.names) > 0:
1029-
error_on_duplicated_field_names: bool = False
1030-
if struct_in_pandas == "legacy" and any(
1031-
_has_type(f.dataType, StructType) for f in schema.fields
1032-
):
1033-
error_on_duplicated_field_names = True
1034-
struct_in_pandas = "dict"
1035-
1036-
# SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the
1037-
# DataFrame, as it may fail with a segmentation fault.
1038-
if table.num_rows == 0:
1039-
# For empty tables, create empty Series with converters to preserve dtypes
1040-
pdf = pd.concat(
1041-
[
1042-
_create_converter_to_pandas(
1043-
field.dataType,
1044-
field.nullable,
1045-
timezone=timezone,
1046-
struct_in_pandas=struct_in_pandas,
1047-
error_on_duplicated_field_names=error_on_duplicated_field_names,
1048-
)(pd.Series([], name=temp_col_names[i], dtype="object"))
1049-
for i, field in enumerate(schema.fields)
1050-
],
1051-
axis="columns",
1052-
)
1053-
else:
1054-
pdf = pd.concat(
1055-
[
1056-
_create_converter_to_pandas(
1057-
field.dataType,
1058-
field.nullable,
1059-
timezone=timezone,
1060-
struct_in_pandas=struct_in_pandas,
1061-
error_on_duplicated_field_names=error_on_duplicated_field_names,
1062-
)(arrow_col.to_pandas(**pandas_options))
1063-
for arrow_col, field in zip(table.columns, schema.fields)
1064-
],
1065-
axis="columns",
1066-
)
1067-
# Restore original column names (including duplicates)
1068-
pdf.columns = schema.names
1069-
else:
1070-
# empty columns
1071-
pdf = table.to_pandas(**pandas_options)
1008+
pdf = _convert_arrow_table_to_pandas(
1009+
arrow_table=table,
1010+
schema=schema,
1011+
timezone=timezone,
1012+
struct_handling_mode=structHandlingMode,
1013+
date_as_object=False,
1014+
self_destruct=selfDestruct == "true",
1015+
)
10721016

10731017
if len(metrics) > 0:
10741018
pdf.attrs["metrics"] = metrics

0 commit comments

Comments
 (0)