11from glob import glob
2- import json
32import os
43
54import time
@@ -454,12 +453,11 @@ def _close_consumer(self):
454453class FromKafkaBatched (Stream ):
455454 """Base class for both local and cluster-based batched kafka processing"""
456455 def __init__ (self , topic , consumer_params , poll_interval = '1s' ,
457- npartitions = 1 , checkpointing = None , ** kwargs ):
456+ npartitions = 1 , ** kwargs ):
458457 self .consumer_params = consumer_params
459458 self .topic = topic
460459 self .npartitions = npartitions
461460 self .positions = [0 ] * npartitions
462- self .checkpointing = checkpointing
463461 self .poll_interval = convert_interval (poll_interval )
464462 self .stopped = True
465463
@@ -472,40 +470,18 @@ def poll_kafka(self):
472470 try :
473471 while not self .stopped :
474472 out = []
475-
476- latest_checkpoint = {}
477- if self .checkpointing is not None :
478- if not os .path .exists (self .checkpointing ):
479- os .makedirs (self .checkpointing )
480- topic_path = self .checkpointing + '/' + self .topic
481- if not os .path .exists (topic_path ):
482- os .makedirs (topic_path )
483- checkpoints_list = os .listdir (topic_path )
484- if len (checkpoints_list ) > 0 :
485- previous_checkpoint = max (checkpoints_list )
486- with open (topic_path + '/' + previous_checkpoint , 'r' ) as fr :
487- latest_checkpoint = json .loads (fr .readlines ()[- 1 ])
488- fr .close ()
489-
490473 for partition in range (self .npartitions ):
491474 tp = ck .TopicPartition (self .topic , partition , 0 )
492475 try :
493476 low , high = self .consumer .get_watermark_offsets (
494477 tp , timeout = 0.1 )
495478 except (RuntimeError , ck .KafkaException ):
496479 continue
497-
498480 current_position = self .positions [partition ]
499- group = self .consumer_params ['group.id' ]
500-
501- if group in latest_checkpoint .keys ():
502- if str (partition ) in latest_checkpoint [group ].keys ():
503- current_position = latest_checkpoint [group ][str (partition )]
504-
505481 lowest = max (current_position , low )
506482 if high > lowest :
507483 out .append ((self .consumer_params , self .topic , partition ,
508- lowest , high - 1 , self . checkpointing ))
484+ lowest , high - 1 ))
509485 self .positions [partition ] = high
510486
511487 for part in out :
@@ -531,8 +507,7 @@ def start(self):
531507
532508@Stream .register_api (staticmethod )
533509def from_kafka_batched (topic , consumer_params , poll_interval = '1s' ,
534- npartitions = 1 , start = False , dask = False ,
535- checkpointing = None , ** kwargs ):
510+ npartitions = 1 , start = False , dask = False , ** kwargs ):
536511 """ Get messages from Kafka in batches
537512
538513 Uses the confluent-kafka library,
@@ -574,8 +549,7 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s',
574549 kwargs ['loop' ] = default_client ().loop
575550 source = FromKafkaBatched (topic , consumer_params ,
576551 poll_interval = poll_interval ,
577- npartitions = npartitions ,
578- checkpointing = checkpointing , ** kwargs )
552+ npartitions = npartitions , ** kwargs )
579553 if dask :
580554 source = source .scatter ()
581555
@@ -585,41 +559,7 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s',
585559 return source .starmap (get_message_batch )
586560
587561
588- def add_checkpoint (group , checkpoint , path ):
589- topic = checkpoint .topic
590- partition = checkpoint .partition
591- offset = checkpoint .offset
592- latest_checkpoint = {}
593- previous_checkpoint = None
594- if not os .path .exists (path ):
595- os .makedirs (path )
596- path = path + '/' + topic
597- if not os .path .exists (path ):
598- os .makedirs (path )
599- checkpoints_list = os .listdir (path )
600- if len (checkpoints_list ) > 0 :
601- previous_checkpoint = max (checkpoints_list )
602- with open (path + '/' + previous_checkpoint , 'r' ) as fr :
603- latest_checkpoint = json .loads (fr .readlines ()[0 ])
604- fr .close ()
605- #Only maintain the last 5 checkpoints
606- if len (checkpoints_list ) == 5 :
607- os .system ('rm -rf ' + path + '/' + min (checkpoints_list ))
608- if group not in latest_checkpoint .keys ():
609- latest_checkpoint [group ] = {}
610- latest_checkpoint [group ][partition ] = offset
611- print (latest_checkpoint )
612- if previous_checkpoint is None :
613- new_checkpoint = '1.txt'
614- else :
615- previous_batch = int (previous_checkpoint .split ('.' )[0 ])
616- new_checkpoint = str (previous_batch + 1 ) + '.txt'
617- with open (path + '/' + new_checkpoint , 'a+' ) as fw :
618- fw .write (json .dumps (latest_checkpoint ) + '\n ' )
619- fw .close ()
620-
621-
622- def get_message_batch (kafka_params , topic , partition , low , high , checkpointing , timeout = None ):
562+ def get_message_batch (kafka_params , topic , partition , low , high , timeout = None ):
623563 """Fetch a batch of kafka messages in given topic/partition
624564
625565 This will block until messages are available, or timeout is reached.
@@ -643,8 +583,5 @@ def get_message_batch(kafka_params, topic, partition, low, high, checkpointing,
643583 if timeout is not None and time .time () - t0 > timeout :
644584 break
645585 finally :
646- if checkpointing is not None :
647- checkpoint = consumer .commit (asynchronous = False )
648- add_checkpoint (kafka_params ['group.id' ], checkpoint [0 ], checkpointing )
649586 consumer .close ()
650587 return out
0 commit comments