88
99class KafkaClient (Service ):
1010 """Kafka client wrapper - assumes external Kafka is running"""
11-
11+
1212 def __init__ (self , context , bootstrap_servers = "localhost:9092" ):
1313 # Use num_nodes=0 since we're not managing any nodes
1414 super (KafkaClient , self ).__init__ (context , num_nodes = 0 )
1515 self .bootstrap_servers_str = bootstrap_servers
16-
16+
1717 def start_node (self , node ):
1818 """No-op since we assume Kafka is already running"""
1919 self .logger .info ("Assuming Kafka is already running on %s" , self .bootstrap_servers_str )
20-
20+
2121 def stop_node (self , node ):
2222 """No-op since we don't manage the Kafka service"""
2323 self .logger .info ("Not stopping external Kafka service" )
24-
24+
2525 def clean_node (self , node ):
2626 """No-op since we don't manage any files"""
2727 self .logger .info ("No cleanup needed for external Kafka service" )
28-
28+
2929 def bootstrap_servers (self ):
3030 """Get bootstrap servers string for clients"""
3131 return self .bootstrap_servers_str
32-
32+
3333 def verify_connection (self ):
3434 """Verify that Kafka is accessible"""
3535 try :
3636 from confluent_kafka .admin import AdminClient
3737 admin_client = AdminClient ({'bootstrap.servers' : self .bootstrap_servers_str })
38-
38+
3939 # Try to get cluster metadata to verify connection
4040 metadata = admin_client .list_topics (timeout = 10 )
41- self .logger .info ("Successfully connected to Kafka. Available topics: %s" ,
42- list (metadata .topics .keys ()))
41+ self .logger .info ("Successfully connected to Kafka. Available topics: %s" ,
42+ list (metadata .topics .keys ()))
4343 return True
4444 except Exception as e :
4545 self .logger .error ("Failed to connect to Kafka at %s: %s" , self .bootstrap_servers_str , e )
4646 return False
47-
47+
4848 def create_topic (self , topic , partitions = 1 , replication_factor = 1 ):
4949 """Create a topic using Kafka admin client"""
5050 try :
5151 from confluent_kafka .admin import AdminClient , NewTopic
52-
52+
5353 admin_client = AdminClient ({'bootstrap.servers' : self .bootstrap_servers_str })
54-
54+
5555 topic_config = NewTopic (
5656 topic = topic ,
5757 num_partitions = partitions ,
5858 replication_factor = replication_factor
5959 )
60-
61- self .logger .info ("Creating topic: %s (partitions=%d, replication=%d)" ,
62- topic , partitions , replication_factor )
63-
60+
61+ self .logger .info ("Creating topic: %s (partitions=%d, replication=%d)" ,
62+ topic , partitions , replication_factor )
63+
6464 # Create topic
6565 fs = admin_client .create_topics ([topic_config ])
66-
66+
6767 # Wait for topic creation to complete
6868 for topic_name , f in fs .items ():
6969 try :
@@ -74,53 +74,53 @@ def create_topic(self, topic, partitions=1, replication_factor=1):
7474 self .logger .info ("Topic %s already exists" , topic_name )
7575 else :
7676 self .logger .warning ("Failed to create topic %s: %s" , topic_name , e )
77-
77+
7878 except ImportError :
7979 self .logger .error ("confluent_kafka not available for topic creation" )
8080 except Exception as e :
8181 self .logger .error ("Failed to create topic %s: %s" , topic , e )
82-
82+
8383 def list_topics (self ):
8484 """List all topics using admin client"""
8585 try :
8686 from confluent_kafka .admin import AdminClient
87-
87+
8888 admin_client = AdminClient ({'bootstrap.servers' : self .bootstrap_servers_str })
8989 metadata = admin_client .list_topics (timeout = 10 )
90-
90+
9191 topics = list (metadata .topics .keys ())
9292 self .logger .debug ("Available topics: %s" , topics )
9393 return topics
94-
94+
9595 except ImportError :
9696 self .logger .error ("confluent_kafka not available for listing topics" )
9797 return []
9898 except Exception as e :
9999 self .logger .error ("Failed to list topics: %s" , e )
100100 return []
101-
101+
102102 def wait_for_topic (self , topic_name , max_wait_time = 30 , initial_wait = 0.1 ):
103103 """
104104 Wait for topic to be created with exponential backoff retry logic.
105105 """
106106 wait_time = initial_wait
107107 total_wait = 0
108-
108+
109109 self .logger .info ("Waiting for topic '%s' to be available..." , topic_name )
110-
110+
111111 while total_wait < max_wait_time :
112112 topics = self .list_topics ()
113113 if topic_name in topics :
114114 self .logger .info ("Topic '%s' is ready after %.2fs" , topic_name , total_wait )
115115 return True
116-
117- self .logger .debug ("Topic '%s' not found, waiting %.2fs (total: %.2fs)" ,
118- topic_name , wait_time , total_wait )
116+
117+ self .logger .debug ("Topic '%s' not found, waiting %.2fs (total: %.2fs)" ,
118+ topic_name , wait_time , total_wait )
119119 time .sleep (wait_time )
120120 total_wait += wait_time
121-
121+
122122 # Exponential backoff with max cap of 2 seconds
123123 wait_time = min (wait_time * 2 , 2.0 )
124-
124+
125125 self .logger .error ("Timeout waiting for topic '%s' after %ds" , topic_name , max_wait_time )
126- return False
126+ return False
0 commit comments