|
33 | 33 | import org.apache.flink.util.TestLogger; |
34 | 34 |
|
35 | 35 | import com.google.common.base.MoreObjects; |
36 | | -import org.apache.kafka.clients.consumer.ConsumerRecord; |
37 | 36 | import org.apache.kafka.clients.producer.Callback; |
38 | 37 | import org.apache.kafka.clients.producer.KafkaProducer; |
39 | 38 | import org.apache.kafka.clients.producer.ProducerConfig; |
|
48 | 47 |
|
49 | 48 | import javax.annotation.Nullable; |
50 | 49 |
|
51 | | -import java.nio.ByteBuffer; |
52 | 50 | import java.util.ArrayList; |
53 | 51 | import java.util.Collection; |
54 | | -import java.util.HashSet; |
55 | 52 | import java.util.List; |
56 | 53 | import java.util.Properties; |
57 | | -import java.util.Set; |
58 | 54 | import java.util.concurrent.atomic.AtomicReference; |
59 | 55 |
|
60 | | -import static org.assertj.core.api.Assertions.fail; |
61 | | - |
62 | 56 | /** |
63 | 57 | * The base for the Kafka tests. It brings up: |
64 | 58 | * |
@@ -278,96 +272,6 @@ public static <K, V> void produceToKafka( |
278 | 272 | } |
279 | 273 | } |
280 | 274 |
|
281 | | - /** |
282 | | - * We manually handle the timeout instead of using JUnit's timeout to return failure instead of |
283 | | - * timeout error. After timeout we assume that there are missing records and there is a bug, not |
284 | | - * that the test has run out of time. |
285 | | - */ |
286 | | - public void assertAtLeastOnceForTopic( |
287 | | - Properties properties, |
288 | | - String topic, |
289 | | - int partition, |
290 | | - Set<Integer> expectedElements, |
291 | | - long timeoutMillis) |
292 | | - throws Exception { |
293 | | - |
294 | | - long startMillis = System.currentTimeMillis(); |
295 | | - Set<Integer> actualElements = new HashSet<>(); |
296 | | - |
297 | | - // until we timeout... |
298 | | - while (System.currentTimeMillis() < startMillis + timeoutMillis) { |
299 | | - properties.put( |
300 | | - "key.deserializer", |
301 | | - "org.apache.kafka.common.serialization.IntegerDeserializer"); |
302 | | - properties.put( |
303 | | - "value.deserializer", |
304 | | - "org.apache.kafka.common.serialization.IntegerDeserializer"); |
305 | | - // We need to set these two properties so that they are lower than request.timeout.ms. |
306 | | - // This is |
307 | | - // required for some old KafkaConsumer versions. |
308 | | - properties.put("session.timeout.ms", "2000"); |
309 | | - properties.put("heartbeat.interval.ms", "500"); |
310 | | - |
311 | | - // query kafka for new records ... |
312 | | - Collection<ConsumerRecord<Integer, Integer>> records = |
313 | | - kafkaServer.getAllRecordsFromTopic(properties, topic); |
314 | | - |
315 | | - for (ConsumerRecord<Integer, Integer> record : records) { |
316 | | - actualElements.add(record.value()); |
317 | | - } |
318 | | - |
319 | | - // succeed if we got all expectedElements |
320 | | - if (actualElements.containsAll(expectedElements)) { |
321 | | - return; |
322 | | - } |
323 | | - } |
324 | | - |
325 | | - fail( |
326 | | - String.format( |
327 | | - "Expected to contain all of: <%s>, but was: <%s>", |
328 | | - expectedElements, actualElements)); |
329 | | - } |
330 | | - |
331 | | - public void assertExactlyOnceForTopic( |
332 | | - Properties properties, String topic, List<Integer> expectedElements) { |
333 | | - |
334 | | - List<Integer> actualElements = new ArrayList<>(); |
335 | | - |
336 | | - Properties consumerProperties = new Properties(); |
337 | | - consumerProperties.putAll(properties); |
338 | | - consumerProperties.put( |
339 | | - "key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); |
340 | | - consumerProperties.put( |
341 | | - "value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); |
342 | | - consumerProperties.put("isolation.level", "read_committed"); |
343 | | - |
344 | | - // query kafka for new records ... |
345 | | - Collection<ConsumerRecord<byte[], byte[]>> records = |
346 | | - kafkaServer.getAllRecordsFromTopic(consumerProperties, topic); |
347 | | - |
348 | | - for (ConsumerRecord<byte[], byte[]> record : records) { |
349 | | - actualElements.add(ByteBuffer.wrap(record.value()).getInt()); |
350 | | - } |
351 | | - |
352 | | - // succeed if we got all expectedElements |
353 | | - if (actualElements.equals(expectedElements)) { |
354 | | - return; |
355 | | - } |
356 | | - |
357 | | - fail( |
358 | | - String.format( |
359 | | - "Expected %s, but was: %s", |
360 | | - formatElements(expectedElements), formatElements(actualElements))); |
361 | | - } |
362 | | - |
363 | | - private String formatElements(List<Integer> elements) { |
364 | | - if (elements.size() > 50) { |
365 | | - return String.format("number of elements: <%s>", elements.size()); |
366 | | - } else { |
367 | | - return String.format("elements: <%s>", elements); |
368 | | - } |
369 | | - } |
370 | | - |
371 | 275 | public static void setNumKafkaClusters(int size) { |
372 | 276 | numKafkaClusters = size; |
373 | 277 | } |
|
0 commit comments