3333import io .confluent .kafkarest .KafkaRestApplication ;
3434import io .confluent .kafkarest .KafkaRestConfig ;
3535import io .confluent .kafkarest .common .CompletableFutures ;
36+ import io .confluent .rest .RestConfig ;
3637import java .io .IOException ;
3738import java .net .ServerSocket ;
3839import java .net .URI ;
7879import org .apache .kafka .common .Node ;
7980import org .apache .kafka .common .TopicPartition ;
8081import org .apache .kafka .common .config .ConfigResource ;
82+ import org .apache .kafka .common .network .ListenerName ;
8183import org .apache .kafka .common .security .auth .SecurityProtocol ;
8284import org .apache .kafka .common .serialization .ByteArraySerializer ;
8385import org .apache .kafka .common .serialization .Deserializer ;
@@ -222,49 +224,57 @@ private void setupMethod() throws Exception {
222224
223225 setupAcls ();
224226 if (withSchemaRegistry ) {
225- int schemaRegPort = choosePort ();
226- schemaRegProperties .put (
227- SchemaRegistryConfig .LISTENERS_CONFIG ,
228- String .format ("http://127.0.0.1:%d" , schemaRegPort ));
229- schemaRegProperties .put (
230- SchemaRegistryConfig .KAFKASTORE_TOPIC_CONFIG ,
231- SchemaRegistryConfig .DEFAULT_KAFKASTORE_TOPIC );
232- schemaRegProperties .put (
233- SchemaRegistryConfig .SCHEMA_COMPATIBILITY_CONFIG , schemaRegCompatibility );
234- String broker =
235- SecurityProtocol .PLAINTEXT .name
236- + "://"
237- + TestUtils .getBrokerListStrFromServers (
238- JavaConverters .asScalaBuffer (servers ), SecurityProtocol .PLAINTEXT );
239- schemaRegProperties .put (SchemaRegistryConfig .KAFKASTORE_BOOTSTRAP_SERVERS_CONFIG , broker );
240- schemaRegConnect = String .format ("http://localhost:%d" , schemaRegPort );
241-
242- schemaRegProperties = overrideSchemaRegistryProps (schemaRegProperties );
243-
244- schemaRegApp =
245- new SchemaRegistryRestApplication (new SchemaRegistryConfig (schemaRegProperties ));
246- schemaRegServer = schemaRegApp .createServer ();
247- schemaRegServer .start ();
248- schemaRegApp .postServerStart ();
227+ doStartSchemaRegistry ();
249228 }
250-
251229 if (manageRest ) {
252- startRest (null , null );
230+ startRest (brokerList , null , null );
253231 }
254232 }
255233
234+ private void doStartSchemaRegistry () throws Exception {
235+ int schemaRegPort = choosePort ();
236+ schemaRegProperties .put (
237+ SchemaRegistryConfig .LISTENERS_CONFIG , String .format ("http://127.0.0.1:%d" , schemaRegPort ));
238+ schemaRegProperties .put (
239+ SchemaRegistryConfig .KAFKASTORE_TOPIC_CONFIG ,
240+ SchemaRegistryConfig .DEFAULT_KAFKASTORE_TOPIC );
241+ schemaRegProperties .put (
242+ SchemaRegistryConfig .SCHEMA_COMPATIBILITY_CONFIG , schemaRegCompatibility );
243+ String broker =
244+ SecurityProtocol .PLAINTEXT .name
245+ + "://"
246+ + TestUtils .getBrokerListStrFromServers (
247+ JavaConverters .asScalaBuffer (servers ), SecurityProtocol .PLAINTEXT );
248+ schemaRegProperties .put (SchemaRegistryConfig .KAFKASTORE_BOOTSTRAP_SERVERS_CONFIG , broker );
249+ schemaRegConnect = String .format ("http://localhost:%d" , schemaRegPort );
250+
251+ schemaRegProperties = overrideSchemaRegistryProps (schemaRegProperties );
252+
253+ schemaRegApp = new SchemaRegistryRestApplication (new SchemaRegistryConfig (schemaRegProperties ));
254+ schemaRegServer = schemaRegApp .createServer ();
255+ schemaRegServer .start ();
256+ schemaRegApp .postServerStart ();
257+ }
258+
256259 protected void startRest (RequestLog .Writer requestLogWriter , String requestLogFormat )
257260 throws Exception {
261+ startRest (brokerList , requestLogWriter , requestLogFormat );
262+ }
263+
264+ protected void startRest (
265+ String bootstrapServers , RequestLog .Writer requestLogWriter , String requestLogFormat )
266+ throws Exception {
267+ if (restServer != null && restServer .isRunning ()) {
268+ log .warn ("Rest server already started, skipping start" );
269+ return ;
270+ }
258271 log .info ("Setting up REST." );
259- int restPort = choosePort ();
260- restProperties .put (KafkaRestConfig .PORT_CONFIG , ((Integer ) restPort ).toString ());
261- restProperties .put (KafkaRestConfig .BOOTSTRAP_SERVERS_CONFIG , brokerList );
272+ restProperties .put (KafkaRestConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
262273 overrideKafkaRestConfigs (restProperties );
263- if (withSchemaRegistry ) {
274+ if (withSchemaRegistry && schemaRegConnect != null ) {
264275 restProperties .put (KafkaRestConfig .SCHEMA_REGISTRY_URL_CONFIG , schemaRegConnect );
265276 }
266- restConnect = getRestConnectString (restPort );
267- restProperties .put ("listeners" , restConnect );
277+ restProperties .put (RestConfig .LISTENERS_CONFIG , getRestConnectString (0 ));
268278
269279 // Reduce the metadata fetch timeout so requests for topics that don't exist timeout much
270280 // faster than the default
@@ -273,22 +283,49 @@ protected void startRest(RequestLog.Writer requestLogWriter, String requestLogFo
273283 "producer." + ProducerConfig .MAX_REQUEST_SIZE_CONFIG , String .valueOf ((2 << 20 ) * 10 ));
274284
275285 restConfig = new KafkaRestConfig (restProperties );
276- restApp = new KafkaRestApplication (restConfig , "" , null , requestLogWriter , requestLogFormat );
277286
278287 try {
279- restServer = restApp .createServer ();
280- restServer .start ();
281- } catch (IOException e1 ) { // sometimes we get an address already in use exception
282- log .warn ("IOException when attempting to start rest, trying again" , e1 );
288+ doStartRest (requestLogWriter , requestLogFormat );
289+ } catch (IOException e ) { // sometimes we get an address already in use exception
290+ log .warn ("IOException when attempting to start rest, trying again" , e );
283291 stopRest ();
284292 Thread .sleep (Duration .ofSeconds (1 ).toMillis ());
285293 try {
286- startRest ( null , null );
294+ doStartRest ( requestLogWriter , requestLogFormat );
287295 } catch (IOException e2 ) {
288296 log .error ("Restart of rest server failed" , e2 );
289297 throw e2 ;
290298 }
291299 }
300+ restConnect = getRestConnectString (restServer .getURI ().getPort ());
301+ }
302+
303+ /**
304+ * Return the bootstrap servers string for the given security protocol.
305+ *
306+ * @param securityProtocol security protocol
307+ * @return bootstrap servers string
308+ */
309+ public String getBootstrapServers (SecurityProtocol securityProtocol ) {
310+ return TestUtils .getBrokerListStrFromServers (
311+ JavaConverters .asScalaBuffer (servers ), securityProtocol );
312+ }
313+
314+ /**
315+ * Return the bootstrap servers string for the given listener name.
316+ *
317+ * @param listenerName listener name
318+ * @return bootstrap servers string
319+ */
320+ public String getBootstrapServers (ListenerName listenerName ) {
321+ return TestUtils .bootstrapServers (JavaConverters .asScalaBuffer (servers ), listenerName );
322+ }
323+
324+ private void doStartRest (RequestLog .Writer requestLogWriter , String requestLogFormat )
325+ throws Exception {
326+ restApp = new KafkaRestApplication (restConfig , "" , null , requestLogWriter , requestLogFormat );
327+ restServer = restApp .createServer ();
328+ restServer .start ();
292329 }
293330
294331 protected void stopRest () throws Exception {
0 commit comments