Skip to content

Commit b0d23e9

Browse files
ronanstokes-dbMarvinSchenkelnfx
authored
Changes and bug fixes to support shared clusters in DBR 14.2 (#248)
* wip * wip * changes for release * example notebook * updates to handle shared spark session restrictions * updates to handle shared sparkSession * updates to handle shared sparkSession * updates to handle shared sparkSession * updates to handle shared sparkSession * updates to handle shared sparkSession * updates to handle shared sparkSession * updates to handle shared sparkSession * updates to handle shared sparkSession * updates to handle shared sparkSession * updates to handle shared sparkSession * changes per code review * Doc updates 032223 (#180) * wip * wip * wip * wip * wip * wip * wip wip Doc updates 032523 (#181) * wip * documentation updates Feature build ordering improvements 2 (#189) * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * improved build ordering * improved build ordering * improved build ordering * reverted unnecessary changes * reverted unnecessary changes * udated ColumnSpecOptions description Feature consistency fixes (#182) * fixed use of time strings to allow both and etc * fixed use of time strings to allow both 'seconds' and 'second' etc * id column fixes Doc updates 100522 (#119) * fixed reference to dbx in pull_request_template * reverted inadvertently changed file * release 0.2.1 * doc updates * doc updates * updates for building docs * updated public docs * updated sphinx version * updated docs * doc updates * removed generated docs * removed changes to non-doc * reverted inadvertently changed file * release 0.2.1 * doc updates doc updates * tidied up makefile * added workflow action to update tag 'preview' * develop branch updates * revised unit tests to use parameterized approach * changed utils tests to pytest * changed changelog format * changelog changes * changelog updates from merge * update to change as a resultt of merge of time fixes * updates for test speed improvements * updated tests * updated tests * updated tests * updated tests * fixed typo * reverted pytest changes - separate feature * reverted pytest changes - separate feature * reverted pytest changes - separate feature * reverted pytest changes - separate feature * changed partitioning to run more efficiently on github runner * changed partitioning to run more efficiently on github runner * changed partitioning to run more efficiently on github runner * changed partitioning to run more efficiently on github runner * changed partitioning to run more efficiently on github runner * use as query name for spark instance * wip * wip * wip * wip * wip * wip * wip * Updates to template text generation for better performance and repeatable text generation * Updates to template text generation for better performance and repeatable text generation * reverted unrelated changes * added further coverage tests and renamed option fromn 'seedColumn' to 'seedColumnName' for clarity * added further coverage test for 'seedColumnName' property' * additional test coverage * updated tests for ILText generation * updated tests for ILText generation * merged changes from master * change to test potential break in build process * updated build process to explicotly use python 3.8 * added explicit python version setup to build * changes to build actions * reverted changes to master + build action changes * remerged repeatable feature generation * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * changed table formatting in TemplateGenerator doc string * changed table formatting in TemplateGenerator doc string * updates from master * updates to develop * dont update coverage when pushing to develop * Feature docs v34 (#197) * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * doc changes only * wip * wip * Update generating_column_data.rst * Update generating_json_data.rst * wip * new document build * adjusted comment banner at start of each doc file * updated build * wip * wip * wip * wip * wip * wip * wip * reverted code comment changes * merge Feature ordering improvements2 into develop (#198) * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * improved build ordering * improved build ordering * improved build ordering * reverted unnecessary changes * reverted unnecessary changes * reverted inadvertent merge * Revert "Merge branch 'develop' into feature_consistency_fixes" This reverts commit e0efc4e, reversing changes made to a263bd9. Feature docs v34 v2 (#192) * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * doc changes only * wip * wip * Update generating_column_data.rst * Update generating_json_data.rst * wip * new document build * adjusted comment banner at start of each doc file * updated build * wip * wip * wip * wip * wip * wip * wip * reverted code comment changes * wip * Feature v34 (#201) * wip * prep for release * prep for release * wip * Feature generate from existing data (#163) * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * Remove calls to root logger. (#205) * Release v34post1 (#206) * wip * hotfix release for logger fixes * fix for root logger configuration * Fix doc typos and minor clarification (#207) * wip * wip * wip * Feature issue 209 (#210) * wip * wip * wip * Release 0v34post2 (#211) * wip * wip * Feature html formatting (#208) * wip * wip * wip * wip * wip * wip * Build fixes (#213) * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * updated build version * updated build version * Feature doc change generating text (#218) * wip * wip * wip * updated running of prospector * Feature build update (#220) * wip * wip * updates to handle changes in upstream pipenv package * updated build version * Feature struct changes (#219) * wip * wip * added support for inferred types * added support for inferred types * wip * wip * wip * updates and fixes to unit tests * wip * updated pipfile due to upstream changes in pipenv * additional tests * wip * wip * wip * wip * removed old code * Feature additional docs (#222) * wip * wip * doc updates * function doc changes only * function doc changes only * changes for release * example notebook * Feature readme updates - updates readme to note compatible Unity Catalog releases (#236) * wip * wip * changes for release * example notebook * updated readme to note compatible unity catalog runtimes * updated readme to note compatible unity catalog runtimes * updated readme to note compatible unity catalog runtimes * updated readme to note compatible unity catalog runtimes * Feature add codeowners (#238) * Test assign (#239) * updates to handle shared spark session restrictions * Update LICENSE (#246) * updates to handle shared sparkSession * changes per code review --------- Co-authored-by: Marvin Schenkel <marvinschenkel@gmail.com> Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
1 parent a6987b2 commit b0d23e9

File tree

6 files changed

+111
-8
lines changed

6 files changed

+111
-8
lines changed

CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
## Change History
44
All notable changes to the Databricks Labs Data Generator will be documented in this file.
55

6-
7-
86
#### Changed
97
* Updated readme to include details on which versions of Databricks runtime support Unity Catalog `shared` access mode.
8+
* Updated code to use default parallelism of 200 when using a shared Spark session
9+
* Updated code to use Spark's SQL function `element_at` instead of array indexing due to incompatibility
1010

1111

1212
### Version 0.3.5

dbldatagen/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from .data_generator import DataGenerator
2727
from .datagen_constants import DEFAULT_RANDOM_SEED, RANDOM_SEED_RANDOM, RANDOM_SEED_FIXED, \
2828
RANDOM_SEED_HASH_FIELD_NAME, MIN_PYTHON_VERSION, MIN_SPARK_VERSION, \
29-
INFER_DATATYPE
29+
INFER_DATATYPE, SPARK_DEFAULT_PARALLELISM
3030
from .utils import ensure, topologicalSort, mkBoundsList, coalesce_values, \
3131
deprecated, parse_time_interval, DataGenError, split_list_matching_condition, strip_margins, \
3232
json_value_from_path, system_time_millis

dbldatagen/column_generation_spec.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1085,7 +1085,7 @@ def _makeSingleGenerationExpression(self, index=None, use_pandas_optimizations=T
10851085
.astype(self.datatype))
10861086

10871087
if self.values is not None:
1088-
new_def = array([lit(x) for x in self.values])[new_def.astype(IntegerType())]
1088+
new_def = F.element_at(F.array([F.lit(x) for x in self.values]), new_def.astype(IntegerType()) + 1)
10891089
elif type(self.datatype) is StringType and self.expr is None:
10901090
new_def = self._applyPrefixSuffixExpressions(self.prefix, self.suffix, new_def)
10911091

dbldatagen/data_generator.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from .datagen_constants import DEFAULT_RANDOM_SEED, RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME, \
1717
DEFAULT_SEED_COLUMN, SPARK_RANGE_COLUMN, MIN_SPARK_VERSION, \
1818
OPTION_RANDOM, OPTION_RANDOM_SEED, OPTION_RANDOM_SEED_METHOD, \
19-
INFER_DATATYPE
19+
INFER_DATATYPE, SPARK_DEFAULT_PARALLELISM
2020
from .html_utils import HtmlUtils
2121
from .schema_parser import SchemaParser
2222
from .spark_singleton import SparkSingleton
@@ -50,6 +50,9 @@ class DataGenerator:
5050
it is recommended that you use a different name for the seed column - for example `_id`.
5151
5252
This may be specified by setting the `seedColumnName` attribute to `_id`
53+
54+
Note: in a shared spark session, the sparkContext is not available, so the default parallelism is set to 200.
55+
We recommend passing an explicit value for `partitions` in this case.
5356
"""
5457

5558
# class vars
@@ -97,9 +100,8 @@ def __init__(self, sparkSession=None, name=None, randomSeedMethod=None,
97100
# if the active Spark session is stopped, you may end up with a valid SparkSession object but the underlying
98101
# SparkContext will be invalid
99102
assert sparkSession is not None, "Spark session not initialized"
100-
assert sparkSession.sparkContext is not None, "Expecting spark session to have valid sparkContext"
101103

102-
self.partitions = partitions if partitions is not None else sparkSession.sparkContext.defaultParallelism
104+
self.partitions = partitions if partitions is not None else self._getDefaultSparkParallelism(sparkSession)
103105

104106
# check for old versions of args
105107
if "starting_id" in kwargs:
@@ -239,6 +241,22 @@ def _setupLogger(self):
239241
else:
240242
self.logger.setLevel(logging.WARNING)
241243

244+
@staticmethod
245+
def _getDefaultSparkParallelism(sparkSession):
246+
"""Get the default parallelism for a spark session, if spark session supports getting the sparkContext
247+
:param sparkSession: spark session
248+
:return: default parallelism
249+
"""
250+
try:
251+
if sparkSession.sparkContext is not None:
252+
return sparkSession.sparkContext.defaultParallelism
253+
else:
254+
return SPARK_DEFAULT_PARALLELISM
255+
except Exception as err: # pylint: disable=broad-exception-caught
256+
err_msg = f"Error getting default parallelism, using default setting of {SPARK_DEFAULT_PARALLELISM}"
257+
logging.warning(err_msg)
258+
return SPARK_DEFAULT_PARALLELISM
259+
242260
@classmethod
243261
def useSeed(cls, seedVal):
244262
""" set seed for random number generation

dbldatagen/datagen_constants.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,7 @@
4242
OPTION_RANDOM_SEED_METHOD = "randomSeedMethod"
4343
OPTION_RANDOM_SEED = "randomSeed"
4444

45-
INFER_DATATYPE = "__infer__"
45+
INFER_DATATYPE = "__infer__"
46+
47+
# default parallelism when sparkContext is not available
48+
SPARK_DEFAULT_PARALLELISM = 200

tests/test_shared_env.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import logging
2+
from unittest.mock import Mock, PropertyMock
3+
4+
import pytest
5+
import dbldatagen as dg
6+
7+
8+
@pytest.fixture(scope="class")
9+
def setupLogging():
10+
FORMAT = '%(asctime)-15s %(message)s'
11+
logging.basicConfig(format=FORMAT)
12+
13+
14+
class TestSharedEnv:
15+
"""Tests to simulate testing under a Unity Catalog shared environment. In a Unity Catalog shared environment with
16+
the 14.x versions of the Databricks runtime, the sparkSession object does not support use of the sparkContext
17+
attribute to get the default parallelism. In this case, we want to catch errors and return a default of
18+
200 as the default number of partitions. This is the same as the default parallelism in many versions of Spark.
19+
20+
21+
"""
22+
SMALL_ROW_COUNT = 100000
23+
COLUMN_COUNT = 10
24+
25+
@pytest.fixture(scope="class")
26+
def sparkSession(self, setupLogging):
27+
spark = dg.SparkSingleton.getLocalInstance("unit tests")
28+
return spark
29+
30+
@pytest.fixture(scope="class")
31+
def sharedSparkSession(self, setupLogging):
32+
spark = Mock(wraps=dg.SparkSingleton.getLocalInstance("unit tests"))
33+
del spark.sparkContext
34+
return spark
35+
36+
@pytest.fixture(scope="class")
37+
def sparkSessionNullContext(self, setupLogging):
38+
39+
class MockSparkSession:
40+
def __init__(self):
41+
self.sparkContext = None
42+
43+
spark = MockSparkSession()
44+
return spark
45+
46+
def test_getDefaultParallelism(self, sparkSession):
47+
"""Test that the default parallelism is returned when the sparkSession object supports use of the
48+
sparkContext attribute to get the default parallelism.
49+
50+
:param sparkSession: The sparkSession object to use for the test.
51+
"""
52+
defaultParallelism = dg.DataGenerator._getDefaultSparkParallelism(sparkSession)
53+
assert defaultParallelism == sparkSession.sparkContext.defaultParallelism
54+
55+
def test_getSharedDefaultParallelism(self, sharedSparkSession):
56+
"""Test that the default parallelism is returned when the sparkSession object supports use of the
57+
sparkContext attribute to get the default parallelism, but that a constant is return when the `sparkContext`
58+
attribute is not available.
59+
"""
60+
defaultParallelism = dg.DataGenerator._getDefaultSparkParallelism(sharedSparkSession)
61+
assert defaultParallelism == dg.SPARK_DEFAULT_PARALLELISM
62+
63+
def test_getNullContextDefaultParallelism(self, sparkSessionNullContext):
64+
"""Test that the default parallelism is returned when the sparkSession object supports use of the
65+
sparkContext attribute to get the default parallelism.
66+
67+
:param sparkSession: The sparkSession object to use for the test.
68+
"""
69+
defaultParallelism = dg.DataGenerator._getDefaultSparkParallelism(sparkSessionNullContext)
70+
assert defaultParallelism == dg.SPARK_DEFAULT_PARALLELISM
71+
72+
def test_mocked_shared_session1(self, sharedSparkSession):
73+
# validate that accessing the sparkContext on the shared spark session raises an exception
74+
with pytest.raises(Exception) as excinfo:
75+
context = sharedSparkSession.sparkContext
76+
77+
assert "sparkContext" in str(excinfo.value)
78+
79+
def test_null_context_spark_session(self, sparkSessionNullContext):
80+
# validate that accessing the sparkContext on the shared spark session raises an exception
81+
context = sparkSessionNullContext.sparkContext
82+
assert context is None

0 commit comments

Comments
 (0)