77import org .apache .flink .connector .datagen .source .GeneratorFunction ;
88import org .apache .flink .connector .kafka .sink .KafkaRecordSerializationSchema ;
99import org .apache .flink .connector .kafka .sink .KafkaSink ;
10- import org .junit .Test ;
11- import static org .junit .Assert .*;
10+ import org .junit .jupiter . api . Test ;
11+ import static org .junit .jupiter . api . Assertions .*;
1212import java .util .Properties ;
1313import java .util .HashMap ;
1414import java .util .Map ;
@@ -33,20 +33,20 @@ public void testCreateDataGeneratorSource() throws Exception {
3333 DataGeneratorSource <StockPrice > source = (DataGeneratorSource <StockPrice >) createDataGeneratorSourceMethod .invoke (
3434 null , dataGenProps , generatorFunction , typeInfo );
3535
36- assertNotNull ("DataGeneratorSource should not be null" , source );
36+ assertNotNull (source , "DataGeneratorSource should not be null" );
3737
3838 // Test with null properties (should use default rate)
3939 source = (DataGeneratorSource <StockPrice >) createDataGeneratorSourceMethod .invoke (
4040 null , null , generatorFunction , typeInfo );
4141
42- assertNotNull ("DataGeneratorSource should not be null with null properties" , source );
42+ assertNotNull (source , "DataGeneratorSource should not be null with null properties" );
4343
4444 // Test with empty properties (should use default rate)
4545 Properties emptyProps = new Properties ();
4646 source = (DataGeneratorSource <StockPrice >) createDataGeneratorSourceMethod .invoke (
4747 null , emptyProps , generatorFunction , typeInfo );
4848
49- assertNotNull ("DataGeneratorSource should not be null with empty properties" , source );
49+ assertNotNull (source , "DataGeneratorSource should not be null with empty properties" );
5050 }
5151
5252 @ Test
@@ -72,7 +72,7 @@ public void testCreateKafkaSink() throws Exception {
7272 KafkaSink <StockPrice > kafkaSink = (KafkaSink <StockPrice >) createKafkaSinkMethod .invoke (
7373 null , kafkaProps , recordSerializationSchema );
7474
75- assertNotNull ("KafkaSink should not be null" , kafkaSink );
75+ assertNotNull (kafkaSink , "KafkaSink should not be null" );
7676 }
7777
7878 @ Test
@@ -85,20 +85,18 @@ public void testKafkaPartitioningKey() {
8585 byte [] key1 = stock1 .getTicker ().getBytes ();
8686 byte [] key2 = stock2 .getTicker ().getBytes ();
8787
88- assertNotNull ("Kafka key should not be null" , key1 );
89- assertNotNull ("Kafka key should not be null" , key2 );
90- assertTrue ("Kafka key should not be empty" , key1 . length > 0 );
91- assertTrue ("Kafka key should not be empty" , key2 . length > 0 );
88+ assertNotNull (key1 , "Kafka key should not be null" );
89+ assertNotNull (key2 , "Kafka key should not be null" );
90+ assertTrue (key1 . length > 0 , "Kafka key should not be empty" );
91+ assertTrue (key2 . length > 0 , "Kafka key should not be empty" );
9292
9393 // Test that different tickers produce different keys
94- assertFalse ("Different tickers should produce different keys" ,
95- java .util .Arrays .equals (key1 , key2 ));
94+ assertFalse (java .util .Arrays .equals (key1 , key2 ), "Different tickers should produce different keys" );
9695
9796 // Test that same ticker produces same key
9897 StockPrice stock3 = new StockPrice ("2024-01-15T10:30:47" , "AAPL" , 175.50f );
9998 byte [] key3 = stock3 .getTicker ().getBytes ();
100- assertTrue ("Same ticker should produce same key" ,
101- java .util .Arrays .equals (key1 , key3 ));
99+ assertTrue (java .util .Arrays .equals (key1 , key3 ), "Same ticker should produce same key" );
102100 }
103101
104102 @ Test
@@ -109,9 +107,9 @@ public void testConditionalSinkValidation() {
109107 // Test with no sinks configured - should be invalid
110108 boolean hasKinesis = appProperties .get ("KinesisSink" ) != null ;
111109 boolean hasKafka = appProperties .get ("KafkaSink" ) != null ;
112- assertFalse ("Should not have Kinesis sink when not configured" , hasKinesis );
113- assertFalse ("Should not have Kafka sink when not configured" , hasKafka );
114- assertTrue ("Should require at least one sink" , ! hasKinesis && ! hasKafka );
110+ assertFalse (hasKinesis , "Should not have Kinesis sink when not configured" );
111+ assertFalse (hasKafka , "Should not have Kafka sink when not configured" );
112+ assertTrue (! hasKinesis && ! hasKafka , "Should require at least one sink" );
115113
116114 // Test with only Kinesis configured - should be valid
117115 Properties kinesisProps = new Properties ();
@@ -121,9 +119,9 @@ public void testConditionalSinkValidation() {
121119
122120 hasKinesis = appProperties .get ("KinesisSink" ) != null ;
123121 hasKafka = appProperties .get ("KafkaSink" ) != null ;
124- assertTrue ("Should have Kinesis sink when configured" , hasKinesis );
125- assertFalse ("Should not have Kafka sink when not configured" , hasKafka );
126- assertTrue ("Should be valid with one sink" , hasKinesis || hasKafka );
122+ assertTrue (hasKinesis , "Should have Kinesis sink when configured" );
123+ assertFalse (hasKafka , "Should not have Kafka sink when not configured" );
124+ assertTrue (hasKinesis || hasKafka , "Should be valid with one sink" );
127125
128126 // Test with only Kafka configured - should be valid
129127 appProperties .clear ();
@@ -134,17 +132,17 @@ public void testConditionalSinkValidation() {
134132
135133 hasKinesis = appProperties .get ("KinesisSink" ) != null ;
136134 hasKafka = appProperties .get ("KafkaSink" ) != null ;
137- assertFalse ("Should not have Kinesis sink when not configured" , hasKinesis );
138- assertTrue ("Should have Kafka sink when configured" , hasKafka );
139- assertTrue ("Should be valid with one sink" , hasKinesis || hasKafka );
135+ assertFalse (hasKinesis , "Should not have Kinesis sink when not configured" );
136+ assertTrue (hasKafka , "Should have Kafka sink when configured" );
137+ assertTrue (hasKinesis || hasKafka , "Should be valid with one sink" );
140138
141139 // Test with both configured - should be valid
142140 appProperties .put ("KinesisSink" , kinesisProps );
143141
144142 hasKinesis = appProperties .get ("KinesisSink" ) != null ;
145143 hasKafka = appProperties .get ("KafkaSink" ) != null ;
146- assertTrue ("Should have Kinesis sink when configured" , hasKinesis );
147- assertTrue ("Should have Kafka sink when configured" , hasKafka );
148- assertTrue ("Should be valid with both sinks" , hasKinesis || hasKafka );
144+ assertTrue (hasKinesis , "Should have Kinesis sink when configured" );
145+ assertTrue (hasKafka , "Should have Kafka sink when configured" );
146+ assertTrue (hasKinesis || hasKafka , "Should be valid with both sinks" );
149147 }
150148}
0 commit comments