3333import io .confluent .kafkarest .KafkaRestApplication ;
3434import io .confluent .kafkarest .KafkaRestConfig ;
3535import io .confluent .kafkarest .common .CompletableFutures ;
36+ import io .confluent .rest .RestConfig ;
3637import java .io .IOException ;
3738import java .net .ServerSocket ;
3839import java .net .URI ;
8182import org .apache .kafka .common .TopicPartition ;
8283import org .apache .kafka .common .acl .AclBinding ;
8384import org .apache .kafka .common .config .ConfigResource ;
85+ import org .apache .kafka .common .network .ListenerName ;
8486import org .apache .kafka .common .security .auth .SecurityProtocol ;
8587import org .apache .kafka .common .serialization .ByteArraySerializer ;
8688import org .apache .kafka .common .serialization .Deserializer ;
@@ -225,49 +227,57 @@ private void setupMethod() throws Exception {
225227
226228 setupAcls ();
227229 if (withSchemaRegistry ) {
228- int schemaRegPort = choosePort ();
229- schemaRegProperties .put (
230- SchemaRegistryConfig .LISTENERS_CONFIG ,
231- String .format ("http://127.0.0.1:%d" , schemaRegPort ));
232- schemaRegProperties .put (
233- SchemaRegistryConfig .KAFKASTORE_TOPIC_CONFIG ,
234- SchemaRegistryConfig .DEFAULT_KAFKASTORE_TOPIC );
235- schemaRegProperties .put (
236- SchemaRegistryConfig .SCHEMA_COMPATIBILITY_CONFIG , schemaRegCompatibility );
237- String broker =
238- SecurityProtocol .PLAINTEXT .name
239- + "://"
240- + TestUtils .getBrokerListStrFromServers (
241- JavaConverters .asScalaBuffer (servers ), SecurityProtocol .PLAINTEXT );
242- schemaRegProperties .put (SchemaRegistryConfig .KAFKASTORE_BOOTSTRAP_SERVERS_CONFIG , broker );
243- schemaRegConnect = String .format ("http://localhost:%d" , schemaRegPort );
244-
245- schemaRegProperties = overrideSchemaRegistryProps (schemaRegProperties );
246-
247- schemaRegApp =
248- new SchemaRegistryRestApplication (new SchemaRegistryConfig (schemaRegProperties ));
249- schemaRegServer = schemaRegApp .createServer ();
250- schemaRegServer .start ();
251- schemaRegApp .postServerStart ();
230+ doStartSchemaRegistry ();
252231 }
253-
254232 if (manageRest ) {
255- startRest (null , null );
233+ startRest (brokerList , null , null );
256234 }
257235 }
258236
237+ private void doStartSchemaRegistry () throws Exception {
238+ int schemaRegPort = choosePort ();
239+ schemaRegProperties .put (
240+ SchemaRegistryConfig .LISTENERS_CONFIG , String .format ("http://127.0.0.1:%d" , schemaRegPort ));
241+ schemaRegProperties .put (
242+ SchemaRegistryConfig .KAFKASTORE_TOPIC_CONFIG ,
243+ SchemaRegistryConfig .DEFAULT_KAFKASTORE_TOPIC );
244+ schemaRegProperties .put (
245+ SchemaRegistryConfig .SCHEMA_COMPATIBILITY_CONFIG , schemaRegCompatibility );
246+ String broker =
247+ SecurityProtocol .PLAINTEXT .name
248+ + "://"
249+ + TestUtils .getBrokerListStrFromServers (
250+ JavaConverters .asScalaBuffer (servers ), SecurityProtocol .PLAINTEXT );
251+ schemaRegProperties .put (SchemaRegistryConfig .KAFKASTORE_BOOTSTRAP_SERVERS_CONFIG , broker );
252+ schemaRegConnect = String .format ("http://localhost:%d" , schemaRegPort );
253+
254+ schemaRegProperties = overrideSchemaRegistryProps (schemaRegProperties );
255+
256+ schemaRegApp = new SchemaRegistryRestApplication (new SchemaRegistryConfig (schemaRegProperties ));
257+ schemaRegServer = schemaRegApp .createServer ();
258+ schemaRegServer .start ();
259+ schemaRegApp .postServerStart ();
260+ }
261+
259262 protected void startRest (RequestLog .Writer requestLogWriter , String requestLogFormat )
260263 throws Exception {
264+ startRest (brokerList , requestLogWriter , requestLogFormat );
265+ }
266+
267+ protected void startRest (
268+ String bootstrapServers , RequestLog .Writer requestLogWriter , String requestLogFormat )
269+ throws Exception {
270+ if (restServer != null && restServer .isRunning ()) {
271+ log .warn ("Rest server already started, skipping start" );
272+ return ;
273+ }
261274 log .info ("Setting up REST." );
262- int restPort = choosePort ();
263- restProperties .put (KafkaRestConfig .PORT_CONFIG , ((Integer ) restPort ).toString ());
264- restProperties .put (KafkaRestConfig .BOOTSTRAP_SERVERS_CONFIG , brokerList );
275+ restProperties .put (KafkaRestConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
265276 overrideKafkaRestConfigs (restProperties );
266- if (withSchemaRegistry ) {
277+ if (withSchemaRegistry && schemaRegConnect != null ) {
267278 restProperties .put (KafkaRestConfig .SCHEMA_REGISTRY_URL_CONFIG , schemaRegConnect );
268279 }
269- restConnect = getRestConnectString (restPort );
270- restProperties .put ("listeners" , restConnect );
280+ restProperties .put (RestConfig .LISTENERS_CONFIG , getRestConnectString (0 ));
271281
272282 // Reduce the metadata fetch timeout so requests for topics that don't exist timeout much
273283 // faster than the default
@@ -276,22 +286,49 @@ protected void startRest(RequestLog.Writer requestLogWriter, String requestLogFo
276286 "producer." + ProducerConfig .MAX_REQUEST_SIZE_CONFIG , String .valueOf ((2 << 20 ) * 10 ));
277287
278288 restConfig = new KafkaRestConfig (restProperties );
279- restApp = new KafkaRestApplication (restConfig , "" , null , requestLogWriter , requestLogFormat );
280289
281290 try {
282- restServer = restApp .createServer ();
283- restServer .start ();
284- } catch (IOException e1 ) { // sometimes we get an address already in use exception
285- log .warn ("IOException when attempting to start rest, trying again" , e1 );
291+ doStartRest (requestLogWriter , requestLogFormat );
292+ } catch (IOException e ) { // sometimes we get an address already in use exception
293+ log .warn ("IOException when attempting to start rest, trying again" , e );
286294 stopRest ();
287295 Thread .sleep (Duration .ofSeconds (1 ).toMillis ());
288296 try {
289- startRest ( null , null );
297+ doStartRest ( requestLogWriter , requestLogFormat );
290298 } catch (IOException e2 ) {
291299 log .error ("Restart of rest server failed" , e2 );
292300 throw e2 ;
293301 }
294302 }
303+ restConnect = getRestConnectString (restServer .getURI ().getPort ());
304+ }
305+
306+ /**
307+ * Return the bootstrap servers string for the given security protocol.
308+ *
309+ * @param securityProtocol security protocol
310+ * @return bootstrap servers string
311+ */
312+ public String getBootstrapServers (SecurityProtocol securityProtocol ) {
313+ return TestUtils .getBrokerListStrFromServers (
314+ JavaConverters .asScalaBuffer (servers ), securityProtocol );
315+ }
316+
317+ /**
318+ * Return the bootstrap servers string for the given listener name.
319+ *
320+ * @param listenerName listener name
321+ * @return bootstrap servers string
322+ */
323+ public String getBootstrapServers (ListenerName listenerName ) {
324+ return TestUtils .bootstrapServers (JavaConverters .asScalaBuffer (servers ), listenerName );
325+ }
326+
327+ private void doStartRest (RequestLog .Writer requestLogWriter , String requestLogFormat )
328+ throws Exception {
329+ restApp = new KafkaRestApplication (restConfig , "" , null , requestLogWriter , requestLogFormat );
330+ restServer = restApp .createServer ();
331+ restServer .start ();
295332 }
296333
297334 protected void stopRest () throws Exception {
0 commit comments