3030import org .apache .kafka .connect .errors .DataException ;
3131import org .apache .kafka .connect .transforms .Transformation ;
3232
33- import org .slf4j .Logger ;
34- import org .slf4j .LoggerFactory ;
35-
3633public abstract class ExtractTopic <R extends ConnectRecord <R >> implements Transformation <R > {
37- private static final Logger log = LoggerFactory .getLogger (ExtractTopic .class );
3834
39- private static final List <Schema .Type > SUPPORTED_TYPES_TO_CONVERT_FROM = Arrays .asList (
35+ private static final List <Schema .Type > SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM = Arrays .asList (
4036 Schema .Type .INT8 ,
4137 Schema .Type .INT16 ,
4238 Schema .Type .INT32 ,
@@ -47,6 +43,17 @@ public abstract class ExtractTopic<R extends ConnectRecord<R>> implements Transf
4743 Schema .Type .STRING
4844 );
4945
46+ private static final List <Class <?>> SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM = Arrays .asList (
47+ Byte .class ,
48+ Short .class ,
49+ Integer .class ,
50+ Long .class ,
51+ Double .class ,
52+ Float .class ,
53+ Boolean .class ,
54+ String .class
55+ );
56+
5057 private ExtractTopicConfig config ;
5158
5259 @ Override
@@ -62,17 +69,25 @@ public void configure(final Map<String, ?> settings) {
6269 @ Override
6370 public R apply (final R record ) {
6471 final SchemaAndValue schemaAndValue = getSchemaAndValue (record );
65- if (schemaAndValue .schema () == null ) {
66- throw new DataException (dataPlace () + " schema can't be null: " + record );
67- }
6872
6973 final Optional <String > newTopic ;
70- if (config .fieldName ().isPresent ()) {
71- newTopic = getNewTopicForNamedField (
72- record .toString (), schemaAndValue .schema (), schemaAndValue .value (), config .fieldName ().get ());
73- } else {
74- newTopic = getNewTopicWithoutFieldName (
75- record .toString (), schemaAndValue .schema (), schemaAndValue .value ());
74+
75+ if (schemaAndValue .schema () == null ) { // schemaless values (Map)
76+ if (config .fieldName ().isPresent ()) {
77+ newTopic = topicNameFromNamedFieldSchemaless (
78+ record .toString (), schemaAndValue .value (), config .fieldName ().get ());
79+ } else {
80+ newTopic = topicNameWithoutFieldNameSchemaless (
81+ record .toString (), schemaAndValue .value ());
82+ }
83+ } else { // schema-based values (Struct)
84+ if (config .fieldName ().isPresent ()) {
85+ newTopic = topicNameFromNamedFieldWithSchema (
86+ record .toString (), schemaAndValue .schema (), schemaAndValue .value (), config .fieldName ().get ());
87+ } else {
88+ newTopic = topicNameWithoutFieldNameWithSchema (
89+ record .toString (), schemaAndValue .schema (), schemaAndValue .value ());
90+ }
7691 }
7792
7893 if (newTopic .isPresent ()) {
@@ -95,10 +110,66 @@ public R apply(final R record) {
95110
96111 protected abstract SchemaAndValue getSchemaAndValue (final R record );
97112
98- private Optional <String > getNewTopicForNamedField (final String recordStr ,
99- final Schema schema ,
100- final Object value ,
101- final String fieldName ) {
113+ private Optional <String > topicNameFromNamedFieldSchemaless (final String recordStr ,
114+ final Object value ,
115+ final String fieldName ) {
116+ if (value == null ) {
117+ throw new DataException (dataPlace () + " can't be null if field name is specified: " + recordStr );
118+ }
119+
120+ if (!(value instanceof Map )) {
121+ throw new DataException (dataPlace () + " type must be Map if field name is specified: " + recordStr );
122+ }
123+
124+ @ SuppressWarnings ("unchecked" ) final Map <String , Object > valueMap = (Map <String , Object >) value ;
125+
126+ final Optional <String > result = Optional .ofNullable (valueMap .get (fieldName ))
127+ .map (field -> {
128+ if (!SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM .contains (field .getClass ())) {
129+ throw new DataException (fieldName + " type in " + dataPlace ()
130+ + " " + value
131+ + " must be " + SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM
132+ + ": " + recordStr );
133+ }
134+ return field ;
135+ })
136+ .map (Object ::toString );
137+
138+ if (result .isPresent () && !result .get ().isBlank ()) {
139+ return result ;
140+ } else {
141+ if (config .skipMissingOrNull ()) {
142+ return Optional .empty ();
143+ } else {
144+ throw new DataException (fieldName + " in " + dataPlace () + " can't be null or empty: " + recordStr );
145+ }
146+ }
147+ }
148+
149+ private Optional <String > topicNameWithoutFieldNameSchemaless (final String recordStr ,
150+ final Object value ) {
151+ if (value == null || value .toString ().isBlank ()) {
152+ if (config .skipMissingOrNull ()) {
153+ return Optional .empty ();
154+ } else {
155+ throw new DataException (dataPlace () + " can't be null or empty: " + recordStr );
156+ }
157+ }
158+
159+ if (!SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM .contains (value .getClass ())) {
160+ throw new DataException ("type in " + dataPlace ()
161+ + " " + value
162+ + " must be " + SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM
163+ + ": " + recordStr );
164+ }
165+
166+ return Optional .of (value .toString ());
167+ }
168+
169+ private Optional <String > topicNameFromNamedFieldWithSchema (final String recordStr ,
170+ final Schema schema ,
171+ final Object value ,
172+ final String fieldName ) {
102173 if (Schema .Type .STRUCT != schema .type ()) {
103174 throw new DataException (dataPlace () + " schema type must be STRUCT if field name is specified: "
104175 + recordStr );
@@ -117,9 +188,9 @@ private Optional<String> getNewTopicForNamedField(final String recordStr,
117188 }
118189 }
119190
120- if (!SUPPORTED_TYPES_TO_CONVERT_FROM .contains (field .schema ().type ())) {
191+ if (!SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM .contains (field .schema ().type ())) {
121192 throw new DataException (fieldName + " schema type in " + dataPlace ()
122- + " must be " + SUPPORTED_TYPES_TO_CONVERT_FROM
193+ + " must be " + SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM
123194 + ": " + recordStr );
124195 }
125196
@@ -138,12 +209,12 @@ private Optional<String> getNewTopicForNamedField(final String recordStr,
138209 }
139210 }
140211
141- private Optional <String > getNewTopicWithoutFieldName (final String recordStr ,
142- final Schema schema ,
143- final Object value ) {
144- if (!SUPPORTED_TYPES_TO_CONVERT_FROM .contains (schema .type ())) {
212+ private Optional <String > topicNameWithoutFieldNameWithSchema (final String recordStr ,
213+ final Schema schema ,
214+ final Object value ) {
215+ if (!SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM .contains (schema .type ())) {
145216 throw new DataException (dataPlace () + " schema type must be "
146- + SUPPORTED_TYPES_TO_CONVERT_FROM
217+ + SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM
147218 + " if field name is not specified: "
148219 + recordStr );
149220 }
0 commit comments