11.. Test Data Generator documentation master file, created by
2- sphinx-quickstart on Sun Jun 21 10:54:30 2020.
3- You can adapt this file completely to your liking, but it should at least
4- contain the root `toctree` directive.
2+ sphinx-quickstart on Sun Jun 21 10:54:30 2020.
3+ You can adapt this file completely to your liking, but it should at least
4+ contain the root `toctree ` directive.
55
66Generating and Using Data with Multiple Tables
77==============================================
@@ -73,7 +73,9 @@ Here we use a simple sequence for our plan ids.
7373 import dbldatagen as dg
7474 import pyspark.sql.functions as F
7575
76- spark.catalog.clearCache() # clear cache so that if we run multiple times to check performance, we're not relying on cache
76+ # clear cache so that if we run multiple times to check performance,
77+ # we're not relying on cache
78+ spark.catalog.clearCache()
7779
7880 UNIQUE_PLANS = 20
7981 PLAN_MIN_VALUE = 100
@@ -87,36 +89,35 @@ Here we use a simple sequence for our plan ids.
8789 spark.conf.set(" spark.sql.execution.arrow.maxRecordsPerBatch" , 20000 )
8890
8991
90- plan_dataspec = (dg.DataGenerator(spark, rows = data_rows, partitions = partitions_requested)
91- .withColumn(" plan_id" ," int" , minValue = PLAN_MIN_VALUE , uniqueValues = UNIQUE_PLANS )
92- # use plan_id as root value
93- .withColumn(" plan_name" , prefix = " plan" , baseColumn = " plan_id" )
94-
95- # note default step is 1 so you must specify a step for small number ranges,
96- .withColumn(" cost_per_mb" , " decimal(5,3)" , minValue = 0.005 , maxValue = 0.050 ,
97- step = 0.005 , random = True )
98- .withColumn(" cost_per_message" , " decimal(5,3)" , minValue = 0.001 , maxValue = 0.02 ,
99- step = 0.001 , random = True )
100- .withColumn(" cost_per_minute" , " decimal(5,3)" , minValue = 0.001 , maxValue = 0.01 ,
101- step = 0.001 , random = True )
102-
103- # we're modelling long distance and international prices simplistically -
104- # each is a multiplier thats applied to base rate
105- .withColumn(" ld_multiplier" , " decimal(5,3)" , minValue = 1.5 , maxValue = 3 , step = 0.05 ,
106- random = True , distribution = " normal" , omit = True )
107- .withColumn(" ld_cost_per_minute" , " decimal(5,3)" ,
108- expr = " cost_per_minute * ld_multiplier" ,
109- baseColumns = [' cost_per_minute' , ' ld_multiplier' ])
110- .withColumn(" intl_multiplier" , " decimal(5,3)" , minValue = 2 , maxValue = 4 , step = 0.05 ,
111- random = True , distribution = " normal" , omit = True )
112- .withColumn(" intl_cost_per_minute" , " decimal(5,3)" ,
113- expr = " cost_per_minute * intl_multiplier" ,
114- baseColumns = [' cost_per_minute' , ' intl_multiplier' ])
92+ plan_dataspec = (
93+ dg.DataGenerator(spark, rows = data_rows, partitions = partitions_requested)
94+ .withColumn(" plan_id" ," int" , minValue = PLAN_MIN_VALUE , uniqueValues = UNIQUE_PLANS )
95+ # use plan_id as root value
96+ .withColumn(" plan_name" , prefix = " plan" , baseColumn = " plan_id" )
97+
98+ # note default step is 1 so you must specify a step for small number ranges,
99+ .withColumn(" cost_per_mb" , " decimal(5,3)" , minValue = 0.005 , maxValue = 0.050 ,
100+ step = 0.005 , random = True )
101+ .withColumn(" cost_per_message" , " decimal(5,3)" , minValue = 0.001 , maxValue = 0.02 ,
102+ step = 0.001 , random = True )
103+ .withColumn(" cost_per_minute" , " decimal(5,3)" , minValue = 0.001 , maxValue = 0.01 ,
104+ step = 0.001 , random = True )
105+
106+ # we're modelling long distance and international prices simplistically -
107+ # each is a multiplier thats applied to base rate
108+ .withColumn(" ld_multiplier" , " decimal(5,3)" , minValue = 1.5 , maxValue = 3 , step = 0.05 ,
109+ random = True , distribution = " normal" , omit = True )
110+ .withColumn(" ld_cost_per_minute" , " decimal(5,3)" ,
111+ expr = " cost_per_minute * ld_multiplier" ,
112+ baseColumns = [' cost_per_minute' , ' ld_multiplier' ])
113+ .withColumn(" intl_multiplier" , " decimal(5,3)" , minValue = 2 , maxValue = 4 , step = 0.05 ,
114+ random = True , distribution = " normal" , omit = True )
115+ .withColumn(" intl_cost_per_minute" , " decimal(5,3)" ,
116+ expr = " cost_per_minute * intl_multiplier" ,
117+ baseColumns = [' cost_per_minute' , ' intl_multiplier' ])
115118 )
116119
117- df_plans = (plan_dataspec.build()
118- .cache()
119- )
120+ df_plans = plan_dataspec.build().cache()
120121
121122 display(df_plans)
122123
@@ -195,10 +196,11 @@ when using hashed values, the range of the hashes produced can be large.
195196
196197 effective_customers = df_customers.count()
197198
198- print (stripMargin(f """ revised customers : { df_customers.count()} ,
199- | unique customers: { df_customers.select(F.countDistinct(' customer_id' )).take(1 )[0 ][0 ]} ,
200- | unique device ids: { df_customers.select(F.countDistinct(' device_id' )).take(1 )[0 ][0 ]} ,
201- | unique phone numbers: { df_customers.select(F.countDistinct(' phone_number' )).take(1 )[0 ][0 ]} """ )
199+ print (stripMargin(
200+ f """ revised customers : { df_customers.count()} ,
201+ | unique customers: { df_customers.select(F.countDistinct(' customer_id' )).take(1 )[0 ][0 ]} ,
202+ | unique device ids: { df_customers.select(F.countDistinct(' device_id' )).take(1 )[0 ][0 ]} ,
203+ | unique phone numbers: { df_customers.select(F.countDistinct(' phone_number' )).take(1 )[0 ][0 ]} """ )
202204 )
203205
204206 display(df_customers)
@@ -247,7 +249,8 @@ A simple approach is simply to multiply the
247249 # use random seed method of 'hash_fieldname' for better spread - default in later builds
248250 events_dataspec = (dg.DataGenerator(spark, rows = data_rows, partitions = partitions_requested,
249251 randomSeed = 42 , randomSeedMethod = " hash_fieldname" )
250- # use same logic as per customers dataset to ensure matching keys - but make them random
252+ # use same logic as per customers dataset to ensure matching keys
253+ # but make them random
251254 .withColumn(" device_id_base" ," decimal(10)" , minValue = CUSTOMER_MIN_VALUE ,
252255 uniqueValues = UNIQUE_CUSTOMERS ,
253256 random = True , omit = True )
@@ -260,12 +263,14 @@ A simple approach is simply to multiply the
260263 weights = [50 , 50 , 20 , 10 , 5 ], random = True )
261264
262265 # use Gamma distribution for skew towards short calls
263- .withColumn(" base_minutes" ," decimal(7,2)" , minValue = 1.0 , maxValue = 100.0 , step = 0.1 ,
266+ .withColumn(" base_minutes" ," decimal(7,2)" ,
267+ minValue = 1.0 , maxValue = 100.0 , step = 0.1 ,
264268 distribution = dg.distributions.Gamma(shape = 1.5 , scale = 2.0 ),
265269 random = True , omit = True )
266270
267271 # use Gamma distribution for skew towards short transfers
268- .withColumn(" base_bytes_transferred" ," decimal(12)" , minValue = K_1, maxValue = MB_100 ,
272+ .withColumn(" base_bytes_transferred" ," decimal(12)" ,
273+ minValue = K_1, maxValue = MB_100 ,
269274 distribution = dg.distributions.Gamma(shape = 0.75 , scale = 2.0 ),
270275 random = True , omit = True )
271276
@@ -308,8 +313,7 @@ Let's compute the customers and associated plans
308313 import pyspark.sql.functions as F
309314 import pyspark.sql.types as T
310315
311- df_customer_pricing = df_customers.join(df_plans,
312- df_plans.plan_id == df_customers.plan)
316+ df_customer_pricing = df_customers.join(df_plans, df_plans.plan_id == df_customers.plan)
313317
314318 display(df_customer_pricing)
315319
@@ -365,8 +369,9 @@ now let's compute the invoices
365369
366370.. code-block :: python
367371
368- df_customer_summary = (df_customer_pricing.join(df_summary,
369- df_customer_pricing.device_id == df_summary.device_id )
372+ df_customer_summary = (
373+ df_customer_pricing.join(df_summary,
374+ df_customer_pricing.device_id == df_summary.device_id )
370375 .createOrReplaceTempView(" customer_summary" ))
371376
372377 df_invoices = spark.sql("""
0 commit comments