44https://github.com/toddwschneider/nyc-taxi-data
55"""
66import concurrent .futures
7+ import gzip
78import io
89import multiprocessing
910from collections import OrderedDict
@@ -92,10 +93,10 @@ def parse_row(row: OrderedDict):
9293
9394 return Point ("taxi-trip-data" ) \
9495 .tag ("dispatching_base_num" , row ['dispatching_base_num' ]) \
95- .tag ("PULocationID" , row ['PULocationID ' ]) \
96- .tag ("DOLocationID" , row ['DOLocationID ' ]) \
96+ .tag ("PULocationID" , row ['PUlocationID ' ]) \
97+ .tag ("DOLocationID" , row ['DOlocationID ' ]) \
9798 .tag ("SR_Flag" , row ['SR_Flag' ]) \
98- .field ("dropoff_datetime" , row ['dropoff_datetime ' ]) \
99+ .field ("dropoff_datetime" , row ['dropOff_datetime ' ]) \
99100 .time (row ['pickup_datetime' ]) \
100101 .to_line_protocol ()
101102
@@ -113,7 +114,7 @@ def parse_rows(rows, total_size):
113114 counter_ .value += len (_parsed_rows )
114115 if counter_ .value % 10_000 == 0 :
115116 print ('{0:8}{1}' .format (counter_ .value , ' - {0:.2f} %'
116- .format (100 * float (progress_ .value ) / float (int (total_size ))) if total_size else "" ))
117+ .format (float (progress_ .value ) / float (int (total_size ))) if total_size else "" ))
117118 pass
118119
119120 queue_ .put (_parsed_rows )
@@ -141,80 +142,80 @@ def init_counter(counter, progress, queue):
141142 progress_ = Value ('i' , 0 )
142143 startTime = datetime .now ()
143144
144- url = "https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_2019-01.csv"
145- # url = "file:///Users/bednar/Developer/influxdata/influxdb-client-python/examples/fhv_tripdata_2019-01.csv"
145+ url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-01.csv.gz"
146146
147147 """
148148 Open URL and for stream data
149149 """
150150 response = urlopen (url )
151- if response .headers :
152- content_length = response .headers ['Content-length' ]
153- io_wrapper = ProgressTextIOWrapper (response )
154- io_wrapper .progress = progress_
151+ # we can't get content length from response because the gzip stream content length is unknown
152+ # so we set it to this value, just for progress display
153+ content_length = 23143223
155154
156155 """
157- Start writer as a new process
156+ Open GZIP stream
158157 """
159- writer = InfluxDBWriter (queue_ )
160- writer .start ()
158+ with gzip .open (response , 'rb' ) as stream :
159+ io_wrapper = ProgressTextIOWrapper (stream , encoding = 'utf-8' )
160+ io_wrapper .progress = progress_
161161
162- """
163- Create process pool for parallel encoding into LineProtocol
164- """
165- cpu_count = multiprocessing .cpu_count ()
166- with concurrent .futures .ProcessPoolExecutor (cpu_count , initializer = init_counter ,
167- initargs = (counter_ , progress_ , queue_ )) as executor :
168162 """
169- Converts incoming HTTP stream into sequence of LineProtocol
163+ Start writer as a new process
170164 """
171- data = rx \
172- .from_iterable (DictReader (io_wrapper )) \
173- .pipe (ops .buffer_with_count (10_000 ),
174- # Parse 10_000 rows into LineProtocol on subprocess
175- ops .flat_map (lambda rows : executor .submit (parse_rows , rows , content_length )))
165+ writer = InfluxDBWriter (queue_ )
166+ writer .start ()
176167
177168 """
178- Write data into InfluxDB
169+ Create process pool for parallel encoding into LineProtocol
179170 """
180- data .subscribe (on_next = lambda x : None , on_error = lambda ex : print (f'Unexpected error: { ex } ' ))
181-
182- """
183- Terminate Writer
184- """
185- queue_ .put (None )
186- queue_ .join ()
171+ cpu_count = multiprocessing .cpu_count ()
172+ with concurrent .futures .ProcessPoolExecutor (cpu_count , initializer = init_counter ,
173+ initargs = (counter_ , progress_ , queue_ )) as executor :
174+ """
175+ Converts incoming HTTP stream into sequence of LineProtocol
176+ """
177+ data = rx \
178+ .from_iterable (DictReader (io_wrapper )) \
179+ .pipe (ops .buffer_with_count (10_000 ),
180+ # Parse 10_000 rows into LineProtocol on subprocess
181+ ops .map (lambda rows : executor .submit (parse_rows , rows , content_length )))
182+
183+ """
184+ Write data into InfluxDB
185+ """
186+ data .subscribe (on_next = lambda x : None , on_error = lambda ex : print (f'Unexpected error: { ex } ' ))
187187
188- print ()
189- print (f'Import finished in: { datetime .now () - startTime } ' )
190- print ()
191-
192- """
193- Querying 10 pickups from dispatching 'B00008'
194- """
195- query = 'from(bucket:"my-bucket")' \
196- '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \
197- '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \
198- '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \
199- '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
200- '|> rename(columns: {_time: "pickup_datetime"})' \
201- '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)'
202-
203- client = InfluxDBClient (url = "http://localhost:8086" , token = "my-token" , org = "my-org" , debug = False )
204- result = client .query_api ().query (query = query )
188+ """
189+ Terminate Writer
190+ """
191+ queue_ .put (None )
192+ queue_ .join ()
205193
206- """
207- Processing results
208- """
209- print ()
210- print ("=== Querying 10 pickups from dispatching 'B00008' ===" )
211- print ()
212- for table in result :
213- for record in table .records :
214- print (
215- f'Dispatching: { record ["dispatching_base_num" ]} pickup: { record ["pickup_datetime" ]} dropoff: { record ["dropoff_datetime" ]} ' )
194+ print ()
195+ print (f'Import finished in: { datetime .now () - startTime } ' )
196+ print ()
216197
217- """
218- Close client
219- """
220- client .close ()
198+ """
199+ Querying 10 pickups from dispatching 'B00008'
200+ """
201+ query = 'from(bucket:"my-bucket")' \
202+ '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \
203+ '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \
204+ '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \
205+ '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
206+ '|> rename(columns: {_time: "pickup_datetime"})' \
207+ '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)'
208+
209+ with InfluxDBClient (url = "http://localhost:8086" , token = "my-token" , org = "my-org" , debug = False ) as client :
210+ result = client .query_api ().query (query = query )
211+
212+ """
213+ Processing results
214+ """
215+ print ()
216+ print ("=== Querying 10 pickups from dispatching 'B00008' ===" )
217+ print ()
218+ for table in result :
219+ for record in table .records :
220+ print (
221+ f'Dispatching: { record ["dispatching_base_num" ]} pickup: { record ["pickup_datetime" ]} dropoff: { record ["dropoff_datetime" ]} ' )
0 commit comments