Skip to content

Commit 3a2e3a8

Browse files
Feature build ordering improvements 2 (#189)
* wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * improved build ordering * improved build ordering * improved build ordering * reverted unnecessary changes * reverted unnecessary changes * udated ColumnSpecOptions description
1 parent 542809f commit 3a2e3a8

File tree

3 files changed

+148
-32
lines changed

3 files changed

+148
-32
lines changed

dbldatagen/column_spec_options.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ class ColumnSpecOptions(object):
3636
3737
:param step: Step to use for range of generated value. As an alternative, you may use the `dataRange` parameter
3838
39+
:param numColumns: generate `n` columns numbered from 1 .. n-1 with same definition
40+
41+
:param numFeatures: generate `n` columns numbered from 0 .. n-1 with same definition. Alias for `numColumns`
42+
43+
:param structType: If specified as "array" and used with numColumns / numFeatures, will combine columns as array
44+
3945
:param random: If True, will generate random values for column value. Defaults to `False`
4046
4147
:param baseColumn: Either the string name of the base column, or a list of columns to use to

dbldatagen/utils.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,13 @@ def topologicalSort(sources, initial_columns=None, flatten=True):
116116
:arg sources: list of ``(name, set(names of dependencies))`` pairs
117117
:arg initial_columns: force ``initial_columns`` to be computed first
118118
:arg flatten: if true, flatten output list
119-
:returns: list of names in dependency order. If not flattened, result will be list of lists
119+
:returns: list of names in dependency order separated into build phases
120+
121+
.. note::
122+
The algorith will give preference to retaining order of inbound sequence
123+
over modifying order to produce a lower number of build phases.
124+
125+
Overall the effect is that the input build order should be retained unless there are forward references
120126
"""
121127
# generate a copy so that we can modify in place
122128
pending = [(name, set(deps)) for name, deps in sources]
@@ -127,27 +133,36 @@ def topologicalSort(sources, initial_columns=None, flatten=True):
127133
next_pending = []
128134
gen = []
129135
value_emitted = False
136+
defer_emitted = False
130137
gen_provided = []
131138
for entry in pending:
132139
name, deps = entry
133140
deps.difference_update(provided)
134141
if deps:
135142
next_pending.append((name, set(deps)))
143+
144+
# if dependencies will be satisfied by item emitted in this round, defer output
145+
if not deps.difference(gen_provided):
146+
defer_emitted = True
147+
elif defer_emitted:
148+
next_pending.append((name, set(deps)))
136149
elif name in provided:
137-
value_emitted |= True
150+
value_emitted = True
138151
else:
139152
gen.append(name)
140153
gen_provided.append(name)
141-
value_emitted |= True
154+
value_emitted = True
142155
provided.extend(gen_provided)
143156
build_orders.append(gen)
157+
144158
if not value_emitted:
145159
raise ValueError(f"cyclic or missing dependency detected [{next_pending}]")
146160

147161
pending = next_pending
148162

149163
if flatten:
150-
return [item for sublist in build_orders for item in sublist]
164+
flattened_list = [item for sublist in build_orders for item in sublist]
165+
return flattened_list
151166
else:
152167
return build_orders
153168

tests/test_build_planning.py

Lines changed: 123 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def sampleDataSpec(self):
168168
.withColumnSpecs(patterns=".*_ID", match_types=StringType(), format="%010d",
169169
minValue=1, maxValue=123,
170170
step=1)
171-
.withColumnSpecs(patterns=".*_IDS", match_types=StringType(), format="%010d", minValue=1,
171+
.withColumnSpecs(patterns=".*_IDS", match_types="string", format="%010d", minValue=1,
172172
maxValue=100, step=1)
173173
# .withColumnSpec("R3D3_CLUSTER_IDS", minValue=1, maxValue=100, step=1)
174174
.withColumnSpec("XYYZ_IDS", minValue=1, maxValue=123, step=1,
@@ -294,6 +294,7 @@ def test_build_ordering_explicit_dependency(self):
294294
baseColumn="city_id")
295295

296296
build_order = gen1.build_order
297+
logging.info(f"Build order {build_order}")
297298

298299
assert self.builtBefore("city_id", "city_name", build_order)
299300
assert self.builtBefore("city", "city2", build_order)
@@ -308,8 +309,6 @@ def test_build_ordering_explicit_dependency(self):
308309
assert self.builtInSeparatePhase("city", "city_id", build_order)
309310
assert self.builtInSeparatePhase("city", "city_pop", build_order)
310311

311-
print(gen1.build_order)
312-
313312
def test_build_ordering_explicit_dependency2(self):
314313
gen1 = dg.DataGenerator(sparkSession=spark, name="nested_schema", rows=1000, partitions=4,
315314
seedColumnName="_id") \
@@ -327,6 +326,7 @@ def test_build_ordering_explicit_dependency2(self):
327326
baseColumn="city_id")
328327

329328
build_order = gen1.build_order
329+
logging.info(f"Build order {build_order}")
330330

331331
assert self.builtBefore("city", "city_name", build_order)
332332
assert self.builtBefore("city", "city_id", build_order)
@@ -335,8 +335,6 @@ def test_build_ordering_explicit_dependency2(self):
335335
assert self.builtInSeparatePhase("city", "city_id", build_order)
336336
assert self.builtInSeparatePhase("city", "city_pop", build_order)
337337

338-
print(gen1.build_order)
339-
340338
def test_build_ordering_implicit_dependency(self):
341339
gen1 = dg.DataGenerator(sparkSession=spark, name="nested_schema", rows=1000, partitions=4,
342340
seedColumnName="_id") \
@@ -348,7 +346,7 @@ def test_build_ordering_implicit_dependency(self):
348346
expr="named_struct('name', city_name, 'id', city_id, 'population', city_pop)")
349347

350348
build_order = gen1.build_order
351-
print(gen1.build_order)
349+
logging.info(f"Build order {build_order}")
352350

353351
assert self.builtBefore("city", "city_name", build_order)
354352
assert self.builtBefore("city", "city_id", build_order)
@@ -357,6 +355,116 @@ def test_build_ordering_implicit_dependency(self):
357355
assert self.builtInSeparatePhase("city", "city_id", build_order), "fields should be built in separate phase"
358356
assert self.builtInSeparatePhase("city", "city_pop", build_order), "fields should be built in separate phase"
359357

358+
# TODO: build ordering should initially try and build in the order supplied but separate into phases
359+
360+
def test_build_ordering_implicit_dependency2(self):
361+
DEVICE_STATES = ['RUNNING', 'IDLE', 'DOWN']
362+
DEVICE_WEIGHTS = [10, 5, 1]
363+
SITES = ['alpha', 'beta', 'gamma', 'delta', 'phi', 'mu', 'lambda']
364+
AREAS = ['area 1', 'area 2', 'area 3', 'area 4', 'area 5']
365+
LINES = ['line 1', 'line 2', 'line 3', 'line 4', 'line 5', 'line 6']
366+
TAGS = ['statusCode', 'another notification 1', 'another notification 2', 'another notification 3']
367+
NUM_LOCAL_DEVICES = 20
368+
369+
STARTING_DATETIME = "2022-06-01 01:00:00"
370+
END_DATETIME = "2022-09-01 23:59:00"
371+
EVENT_INTERVAL = "10 seconds"
372+
373+
gen1 = (
374+
dg.DataGenerator(spark, rows=1000, partitions=4)
375+
# can combine internal site id computation with value lookup but clearer to use a separate internal column
376+
.withColumn("site", "string", values=SITES, random=True)
377+
.withColumn("area", "string", values=AREAS, random=True, omit=True)
378+
.withColumn("line", "string", values=LINES, random=True, omit=True)
379+
.withColumn("local_device_id", "int", maxValue=NUM_LOCAL_DEVICES - 1, omit=True, random=True)
380+
381+
.withColumn("local_device", "string", prefix="device", baseColumn="local_device_id")
382+
383+
.withColumn("device_key", "string",
384+
expr="concat('/', site, '/', area, '/', line, '/', local_device)")
385+
386+
# used to compute the device id
387+
.withColumn("internal_device_key", "long", expr="hash(site, area, line, local_device)",
388+
omit=True)
389+
390+
.withColumn("deviceId", "string", format="0x%013x",
391+
baseColumn="internal_device_key")
392+
393+
# tag name is name of device signal
394+
.withColumn("tagName", "string", values=TAGS, random=True)
395+
396+
# tag value is state
397+
.withColumn("tagValue", "string",
398+
values=DEVICE_STATES, weights=DEVICE_WEIGHTS,
399+
random=True)
400+
401+
.withColumn("tag_ts", "timestamp",
402+
begin=STARTING_DATETIME,
403+
end=END_DATETIME,
404+
interval=EVENT_INTERVAL,
405+
random=True)
406+
407+
.withColumn("event_date", "date", expr="to_date(tag_ts)")
408+
)
409+
410+
build_order = gen1.build_order
411+
logging.info(f"Build order {build_order}")
412+
413+
assert self.builtBefore("event_date", "tag_ts", build_order)
414+
assert self.builtBefore("device_key", "site", build_order)
415+
assert self.builtBefore("device_key", "area", build_order)
416+
assert self.builtBefore("device_key", "line", build_order)
417+
assert self.builtBefore("device_key", "local_device", build_order)
418+
assert self.builtBefore("internal_device_key", "site", build_order)
419+
assert self.builtBefore("internal_device_key", "area", build_order)
420+
assert self.builtBefore("internal_device_key", "line", build_order)
421+
assert self.builtBefore("internal_device_key", "local_device", build_order)
422+
assert self.builtBefore("device_key", "site", build_order)
423+
assert self.builtBefore("device_key", "area", build_order)
424+
assert self.builtBefore("device_key", "line", build_order)
425+
assert self.builtBefore("device_key", "local_device", build_order)
426+
427+
assert self.builtInSeparatePhase("tag_ts", "event_date", build_order)
428+
assert self.builtInSeparatePhase("site", "device_key", build_order)
429+
assert self.builtInSeparatePhase("area", "device_key", build_order)
430+
assert self.builtInSeparatePhase("line", "device_key", build_order)
431+
assert self.builtInSeparatePhase("local_device", "device_key", build_order)
432+
assert self.builtInSeparatePhase("site", "internal_device_key", build_order)
433+
assert self.builtInSeparatePhase("area", "internal_device_key", build_order)
434+
assert self.builtInSeparatePhase("line", "internal_device_key", build_order)
435+
assert self.builtInSeparatePhase("local_device", "internal_device_key", build_order)
436+
assert self.builtInSeparatePhase("site", "device_key", build_order)
437+
assert self.builtInSeparatePhase("area", "device_key", build_order)
438+
assert self.builtInSeparatePhase("line", "device_key", build_order)
439+
assert self.builtInSeparatePhase("local_device", "device_key", build_order)
440+
441+
def test_implicit_dependency3(self):
442+
dataspec = (
443+
dg.DataGenerator(spark, rows=1000, partitions=4)
444+
.withColumn("name", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w')
445+
.withColumn("payment_instrument_type", values=['cash', 'cc', 'app'],
446+
random=True)
447+
.withColumn("int_payment_instrument", "int", minValue=0000, maxValue=9999,
448+
baseColumn="name",
449+
baseColumnType="hash", omit=True)
450+
.withColumn("payment_instrument",
451+
expr="format_number(int_payment_instrument, '**** ****** *####')")
452+
.withColumn("email", template=r'\\w.\\w@\\w.com')
453+
.withColumn("md5_payment_instrument",
454+
expr="md5(concat(payment_instrument_type, ':', payment_instrument))")
455+
)
456+
457+
build_order = dataspec.build_order
458+
logging.info(f"Build order {build_order}")
459+
460+
assert self.builtBefore("payment_instrument", "int_payment_instrument", build_order)
461+
assert self.builtBefore("md5_payment_instrument", "payment_instrument", build_order)
462+
assert self.builtBefore("md5_payment_instrument", "payment_instrument_type", build_order)
463+
464+
assert self.builtInSeparatePhase("int_payment_instrument", "payment_instrument", build_order)
465+
assert self.builtInSeparatePhase("md5_payment_instrument", "payment_instrument", build_order)
466+
assert self.builtInSeparatePhase("md5_payment_instrument", "payment_instrument_type", build_order)
467+
360468
def test_expr_attribute(self):
361469
sql_expr = "named_struct('name', city_name, 'id', city_id, 'population', city_pop)"
362470
gen1 = dg.DataGenerator(sparkSession=spark, name="nested_schema", rows=1000, partitions=4,
@@ -393,24 +501,19 @@ def test_build_ordering_duplicate_names1(self):
393501
.withColumn("id", "long", minValue=1000000, uniqueValues=10000, random=True) \
394502
.withColumn("city_name", "long", minValue=1000000, uniqueValues=10000, random=True) \
395503
.withColumn("city_name", "string", template=r"\w", random=True, omit=True) \
504+
.withColumn("extra_field", "long", minValue=1000000, uniqueValues=10000, random=True) \
505+
.withColumn("extra_field", "string", template=r"\w", random=True) \
396506
.withColumn("city_id", "long", minValue=1000000, uniqueValues=10000, random=True, omit=True) \
397507
.withColumn("city_pop", "long", minValue=1000000, uniqueValues=10000, random=True, omit=True) \
398508
.withColumn("city", "struct<name:string, id:long, population:long>",
399509
expr="named_struct('name', city_name, 'id', city_id, 'population', city_pop)")
400510

401-
build_order = gen1.build_order
402-
print(gen1.build_order)
511+
logging.info(f"Build order {gen1.build_order}")
403512

404513
df = gen1.build()
405514

406-
df.show()
407-
408-
# assert self.builtBefore("city", "city_name", build_order)
409-
# assert self.builtBefore("city", "city_id", build_order)
410-
# assert self.builtBefore("city", "city_pop", build_order)
411-
# assert self.builtInSeparatePhase("city", "city_name", build_order), "fields should be built in separate phase"
412-
# assert self.builtInSeparatePhase("city", "city_id", build_order), "fields should be built in separate phase"
413-
# assert self.builtInSeparatePhase("city", "city_pop", build_order), "fields should be built in separate phase"
515+
count = df.count()
516+
assert count == 1000
414517

415518
def test_build_ordering_forward_ref(self, caplog):
416519
# caplog fixture captures log content
@@ -425,8 +528,7 @@ def test_build_ordering_forward_ref(self, caplog):
425528
expr="named_struct('name', city_name, 'id', city_id, 'population', city_pop)") \
426529
.withColumn("city_id", "long", minValue=1000000, uniqueValues=10000, random=True, omit=True)
427530

428-
build_order = gen1.build_order
429-
print(gen1.build_order)
531+
logging.info(f"Build order {gen1.build_order}")
430532

431533
seed_column_warnings_and_errors = self.get_log_capture_warngings_and_errors(caplog, "forward references")
432534
assert seed_column_warnings_and_errors >= 1, "Should not have error messages about forward references"
@@ -443,16 +545,9 @@ def test_build_ordering_duplicate_names2(self):
443545
expr="named_struct('name', city_name, 'id', city_id, 'population', city_pop)",
444546
baseColumns=["city_name", "city_id", "city_pop"])
445547

446-
build_order = gen1.build_order
447-
print(gen1.build_order)
548+
logging.info(f"Build order {gen1.build_order}")
448549

449550
df = gen1.build()
450551

451-
df.show()
452-
453-
# assert self.builtBefore("city", "city_name", build_order)
454-
# assert self.builtBefore("city", "city_id", build_order)
455-
# assert self.builtBefore("city", "city_pop", build_order)
456-
# assert self.builtInSeparatePhase("city", "city_name", build_order), "fields should be built in separate phase"
457-
# assert self.builtInSeparatePhase("city", "city_id", build_order), "fields should be built in separate phase"
458-
# assert self.builtInSeparatePhase("city", "city_pop", build_order), "fields should be built in separate phase"
552+
count = df.count()
553+
assert count == 1000

0 commit comments

Comments
 (0)