1+ import time
2+ import argparse
3+ import tensorflow as tf
4+ import os
5+ import sys
6+ import math
7+ import collections
8+ from tensorflow .python .client import timeline
9+ import json
10+
11+
12+
13+ CONTINUOUS_COLUMNS = ["I" + str (i ) for i in range (1 , 14 )] # 1-13 inclusive
14+ CATEGORICAL_COLUMNS = ["C" + str (i ) for i in range (1 , 27 )] # 1-26 inclusive
15+ LABEL_COLUMN = ["clicked" ]
16+ TRAIN_DATA_COLUMNS = LABEL_COLUMN + CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS
17+ FEATURE_COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS
18+ HASH_BUCKET_SIZES = {
19+ 'C1' : 2500 ,
20+ 'C2' : 2000 ,
21+ 'C3' : 5000000 ,
22+ 'C4' : 1500000 ,
23+ 'C5' : 1000 ,
24+ 'C6' : 100 ,
25+ 'C7' : 20000 ,
26+ 'C8' : 4000 ,
27+ 'C9' : 20 ,
28+ 'C10' : 100000 ,
29+ 'C11' : 10000 ,
30+ 'C12' : 5000000 ,
31+ 'C13' : 40000 ,
32+ 'C14' : 100 ,
33+ 'C15' : 100 ,
34+ 'C16' : 3000000 ,
35+ 'C17' : 50 ,
36+ 'C18' : 10000 ,
37+ 'C19' : 4000 ,
38+ 'C20' : 20 ,
39+ 'C21' : 4000000 ,
40+ 'C22' : 100 ,
41+ 'C23' : 100 ,
42+ 'C24' : 250000 ,
43+ 'C25' : 400 ,
44+ 'C26' : 100000
45+ }
46+
47+
48+ def add_layer_summary (value , tag ):
49+ tf .summary .scalar ('%s/fraction_of_zero_values' % tag ,
50+ tf .nn .zero_fraction (value ))
51+ tf .summary .histogram ('%s/activation' % tag , value )
52+
53+
54+
55+
56+ def build_feature_cols ():
57+ wide_column = []
58+ deep_column = []
59+ fm_column = []
60+ for column_name in FEATURE_COLUMNS :
61+ if column_name in CATEGORICAL_COLUMNS :
62+ categorical_column = tf .feature_column .categorical_column_with_embedding (
63+ column_name ,
64+ dtype = tf .string )
65+
66+ categorical_embedding_column = tf .feature_column .embedding_column (
67+ categorical_column , dimension = 16 , combiner = 'mean' )
68+
69+ wide_column .append (categorical_embedding_column )
70+ deep_column .append (categorical_embedding_column )
71+ fm_column .append (categorical_embedding_column )
72+ else :
73+ column = tf .feature_column .numeric_column (column_name , shape = (1 , ))
74+ wide_column .append (column )
75+ deep_column .append (column )
76+
77+ return wide_column , fm_column , deep_column
78+
79+
80+ class DeepFM ():
81+ def __init__ (self ,
82+ wide_column = None ,
83+ fm_column = None ,
84+ deep_column = None ,
85+ dnn_hidden_units = [1024 , 256 , 32 ],
86+ final_hidden_units = [128 , 64 ],
87+ optimizer_type = 'adam' ,
88+ learning_rate = 0.001 ,
89+ inputs = None ,
90+ use_bn = True ,
91+ bf16 = False ,
92+ input_layer_partitioner = None ,
93+ dense_layer_partitioner = None ):
94+ if not inputs :
95+ raise ValueError ('Dataset is not defined.' )
96+ self .wide_column = wide_column
97+ self .deep_column = deep_column
98+ self .fm_column = fm_column
99+ if not wide_column or not fm_column or not deep_column :
100+ raise ValueError (
101+ 'Wide column, FM column or Deep column is not defined.' )
102+ self .dnn_hidden_units = dnn_hidden_units
103+ self .final_hidden_units = final_hidden_units
104+ self .optimizer_type = optimizer_type
105+ self .learning_rate = learning_rate
106+ self .input_layer_partitioner = input_layer_partitioner
107+ self .dense_layer_partitioner = dense_layer_partitioner
108+
109+ self .feature = inputs
110+ self .bf16 = bf16
111+
112+
113+ self .use_bn = use_bn
114+
115+ self .predict = self .prediction ()
116+
117+
118+ def dnn (self , dnn_input , dnn_hidden_units = None , layer_name = '' ):
119+ for layer_id , num_hidden_units in enumerate (dnn_hidden_units ):
120+ with tf .variable_scope (layer_name + "_%d" % layer_id ,
121+ partitioner = self .dense_layer_partitioner ,
122+ reuse = tf .AUTO_REUSE ) as dnn_layer_scope :
123+ dnn_input = tf .layers .dense (dnn_input ,
124+ units = num_hidden_units ,
125+ activation = tf .nn .relu ,
126+ name = dnn_layer_scope )
127+ if self .use_bn :
128+ dnn_input = tf .layers .batch_normalization (
129+ dnn_input )
130+ add_layer_summary (dnn_input , dnn_layer_scope .name )
131+
132+ return dnn_input
133+
134+ def prediction (self ):
135+ # input features
136+ with tf .variable_scope ('input_layer' ,
137+ partitioner = self .input_layer_partitioner ,
138+ reuse = tf .AUTO_REUSE ):
139+
140+ fm_cols = {}
141+ wide_input = tf .feature_column .input_layer (
142+ self .feature , self .wide_column , cols_to_output_tensors = fm_cols )
143+ fm_input = tf .stack ([fm_cols [cols ] for cols in self .fm_column ], 1 )
144+ dnn_input = tf .feature_column .input_layer (self .feature ,
145+ self .deep_column )
146+
147+ if self .bf16 :
148+ wide_input = tf .cast (wide_input , dtype = tf .bfloat16 )
149+ fm_input = tf .cast (fm_input , dtype = tf .bfloat16 )
150+ dnn_input = tf .cast (dnn_input , dtype = tf .bfloat16 )
151+
152+ # DNN part
153+ if self .bf16 :
154+ with tf .variable_scope ('dnn' ).keep_weights ():
155+ dnn_output = self .dnn (dnn_input , self .dnn_hidden_units ,
156+ 'dnn_layer' )
157+ else :
158+ with tf .variable_scope ('dnn' ):
159+ dnn_output = self .dnn (dnn_input , self .dnn_hidden_units ,
160+ 'dnn_layer' )
161+
162+ # linear / fisrt order part
163+ with tf .variable_scope ('linear' , reuse = tf .AUTO_REUSE ) as linear :
164+ linear_output = tf .reduce_sum (wide_input , axis = 1 , keepdims = True )
165+
166+ # FM second order part
167+ with tf .variable_scope ('fm' , reuse = tf .AUTO_REUSE ) as fm :
168+ sum_square = tf .square (tf .reduce_sum (fm_input , axis = 1 ))
169+ square_sum = tf .reduce_sum (tf .square (fm_input ), axis = 1 )
170+ fm_output = 0.5 * tf .subtract (sum_square , square_sum )
171+
172+ # Final dnn layer
173+ all_input = tf .concat ([dnn_output , linear_output , fm_output ], 1 )
174+ if self .bf16 :
175+ with tf .variable_scope ('final_dnn' ).keep_weights ():
176+ net = self .dnn (all_input , self .final_hidden_units , 'final_dnn' )
177+ net = tf .cast (net , dtype = tf .float32 )
178+ else :
179+ with tf .variable_scope ('final_dnn' ):
180+ net = self .dnn (all_input , self .final_hidden_units , 'final_dnn' )
181+
182+ net = tf .layers .dense (net , units = 1 )
183+ net = tf .math .sigmoid (net )
184+ self .output = net
185+ return net
186+
187+
188+
189+
190+ def get_arg_parser ():
191+ parser = argparse .ArgumentParser ()
192+ parser .add_argument ('--data_location' ,
193+ help = 'Full path of train data' ,
194+ required = False ,
195+ default = './data' )
196+ parser .add_argument ('--batch_size' ,
197+ help = 'Batch size to train. Default is 512' ,
198+ type = int ,
199+ default = 512 )
200+ parser .add_argument ('--checkpoint' ,
201+ help = 'Full path to checkpoints input/output directory' ,
202+ required = False )
203+ parser .add_argument ('--bf16' ,
204+ help = 'enable DeepRec BF16 in deep model. Default FP32' ,
205+ action = 'store_true' )
206+ parser .add_argument ("--optimizer" ,
207+ type = str ,
208+ choices = ["adam" , "adagrad" , "adamasync" ],
209+ default = "adam" )
210+ parser .add_argument ('--learning_rate' ,
211+ help = 'Learning rate for model' ,
212+ type = float ,
213+ default = 0.001 )
214+ return parser
215+
216+
217+ def main (tf_config = None , server = None ):
218+
219+
220+ with tf .Session () as sess1 :
221+ batch_size = args .batch_size
222+
223+ # set fixed random seed
224+ tf .set_random_seed (2021 )
225+
226+
227+
228+ # create data pipline
229+ wide_column , fm_column , deep_column = build_feature_cols ()
230+
231+ final_input = {}
232+
233+
234+ for i in range (1 ,14 ):
235+ final_input ["I" + str (i )] = tf .placeholder (tf .float32 ,[None ], name = 'I' + str (i ))
236+ for j in range (1 ,27 ):
237+ final_input ["C" + str (j )] = tf .placeholder (tf .string , [None ], name = 'C' + str (j ))
238+
239+
240+ # create model
241+ model = DeepFM (wide_column = wide_column ,
242+ fm_column = fm_column ,
243+ deep_column = deep_column ,
244+ optimizer_type = args .optimizer ,
245+ learning_rate = args .learning_rate ,
246+ bf16 = args .bf16 ,
247+ inputs = final_input ,
248+ input_layer_partitioner = None ,
249+ dense_layer_partitioner = None )
250+
251+
252+ # Initialize saver
253+ folder_dir = args .checkpoint
254+
255+
256+ saver = tf .train .Saver ()
257+ sess1 .run (tf .global_variables_initializer ())
258+ sess1 .run (tf .local_variables_initializer ())
259+
260+ # Restore from checkpoint
261+ saver .restore (sess1 ,tf .train .latest_checkpoint (folder_dir ))
262+ # Get save directory
263+ dir = "./savedmodels"
264+ os .makedirs (dir ,exist_ok = True )
265+ cc_time = int (time .time ())
266+ saved_path = os .path .join (dir ,str (cc_time ))
267+ os .mkdir (saved_path )
268+
269+
270+
271+ tf .saved_model .simple_save (
272+ sess1 ,
273+ saved_path ,
274+ inputs = model .feature ,
275+ outputs = {"Sigmoid" :model .output }
276+ )
277+
278+
279+ if __name__ == "__main__" :
280+ parser = get_arg_parser ()
281+ args = parser .parse_args ()
282+
283+
284+ main ()
285+
0 commit comments