|
18 | 18 |
|
19 | 19 | package org.apache.flink.connector.kafka.source.reader.deserializer; |
20 | 20 |
|
| 21 | +import org.apache.flink.api.common.typeinfo.TypeHint; |
| 22 | +import org.apache.flink.api.common.typeinfo.TypeInformation; |
21 | 23 | import org.apache.flink.connector.kafka.testutils.SimpleCollector; |
22 | 24 | import org.apache.flink.connector.kafka.util.JacksonMapperFactory; |
23 | 25 | import org.apache.flink.connector.testutils.formats.DummyInitializationContext; |
@@ -79,25 +81,17 @@ public void testKafkaDeserializationSchemaWrapper() throws Exception { |
79 | 81 | @Test |
80 | 82 | public void testKafkaValueDeserializationSchemaWrapper() throws Exception { |
81 | 83 | final ConsumerRecord<byte[], byte[]> consumerRecord = getConsumerRecord(); |
82 | | - KafkaRecordDeserializationSchema< |
83 | | - org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node |
84 | | - .ObjectNode> |
85 | | - schema = |
86 | | - KafkaRecordDeserializationSchema.valueOnly( |
87 | | - new JsonDeserializationSchema<>( |
88 | | - org.apache.flink.shaded.jackson2.com.fasterxml.jackson |
89 | | - .databind.node.ObjectNode.class)); |
| 84 | + KafkaRecordDeserializationSchema<Map<String, Object>> schema = |
| 85 | + KafkaRecordDeserializationSchema.valueOnly( |
| 86 | + new JsonDeserializationSchema<>( |
| 87 | + TypeInformation.of(new TypeHint<Map<String, Object>>() {}))); |
90 | 88 | schema.open(new DummyInitializationContext()); |
91 | | - SimpleCollector< |
92 | | - org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node |
93 | | - .ObjectNode> |
94 | | - collector = new SimpleCollector<>(); |
| 89 | + SimpleCollector<Map<String, Object>> collector = new SimpleCollector<>(); |
95 | 90 | schema.deserialize(consumerRecord, collector); |
96 | 91 |
|
97 | 92 | assertThat(collector.getList()).hasSize(1); |
98 | | - org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode |
99 | | - deserializedValue = collector.getList().get(0); |
100 | | - assertThat(deserializedValue.get("word").asText()).isEqualTo("world"); |
| 93 | + Map<String, Object> deserializedValue = collector.getList().get(0); |
| 94 | + assertThat(deserializedValue.get("word")).isEqualTo("world"); |
101 | 95 | assertThat(deserializedValue.get("key")).isNull(); |
102 | 96 | assertThat(deserializedValue.get("metadata")).isNull(); |
103 | 97 | } |
|
0 commit comments