Skip to content

Commit 69ee044

Browse files
AkshatJindal1castorm
authored andcommitted
Added implementation to parse substring as timestamp field
1 parent ad45ebf commit 69ee044

File tree

5 files changed

+261
-1
lines changed

5 files changed

+261
-1
lines changed

README.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,9 @@ Uses [Jackson](https://github.com/FasterXML/jackson) to look for the records in
456456
Implementation based on based on a `DateTimeFormatter`
457457
> * `com.github.castorm.kafka.connect.http.response.timestamp.NattyTimestampParser`
458458
Implementation based on [Natty](http://natty.joestelmach.com/) parser
459-
>
459+
> * `com.github.castorm.kafka.connect.http.response.timestamp.RegexTimestampParser`
460+
Implementation that extracts substring from timestamp column and parse it
461+
460462
> ##### `http.response.record.timestamp.parser.pattern`
461463
> When using `DateTimeFormatterTimestampParser`, a custom pattern can be specified
462464
> * Type: `String`
@@ -467,6 +469,16 @@ Uses [Jackson](https://github.com/FasterXML/jackson) to look for the records in
467469
identifiers
468470
> * Type: `String`
469471
> * Default: `UTC`
472+
>
473+
> ##### `http.response.record.timestamp.parser.regex`
474+
> When using `RegexTimestampParser`, a custom regex pattern can be specified
475+
> * Type: `String`
476+
> * Default: `.*`
477+
>
478+
##### `http.response.record.timestamp.parser.regex.delegate`
479+
> When using `RegexTimestampParser`, a delegate class to parse timestamp
480+
> * Type: `Class`
481+
> * Default: `DateTimeFormatterTimestampParser`
470482
471483
---
472484
<a name="mapper"/>
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.github.castorm.kafka.connect.http.response.timestamp;
2+
3+
/*-
4+
* #%L
5+
* Kafka Connect HTTP Plugin
6+
* %%
7+
* Copyright (C) 2020 CastorM
8+
* %%
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import com.github.castorm.kafka.connect.http.response.timestamp.spi.TimestampParser;
24+
import lombok.RequiredArgsConstructor;
25+
import lombok.extern.slf4j.Slf4j;
26+
27+
import java.time.Instant;
28+
import java.util.Map;
29+
import java.util.function.Function;
30+
import java.util.regex.Matcher;
31+
import java.util.regex.Pattern;
32+
33+
@RequiredArgsConstructor
34+
@Slf4j
35+
public class RegexTimestampParser implements TimestampParser {
36+
37+
private final Function<Map<String, ?>, RegexTimestampParserConfig> configFactory;
38+
private Pattern pattern;
39+
private TimestampParser delegate;
40+
41+
@Override
42+
public Instant parse(String timestamp) {
43+
Matcher matcher = pattern.matcher(timestamp);
44+
String extractedTimestamp;
45+
matcher.find();
46+
extractedTimestamp = matcher.group(1);
47+
return delegate.parse(extractedTimestamp);
48+
}
49+
50+
@Override
51+
public void configure(Map<String, ?> settings) {
52+
RegexTimestampParserConfig config = configFactory.apply(settings);
53+
pattern = Pattern.compile(config.getTimestampRegex());
54+
delegate = config.getDelegateParser();
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.github.castorm.kafka.connect.http.response.timestamp;
2+
3+
/*-
4+
* #%L
5+
* kafka-connect-http
6+
* %%
7+
* Copyright (C) 2020 CastorM
8+
* %%
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import com.github.castorm.kafka.connect.http.response.timestamp.spi.TimestampParser;
24+
import lombok.Getter;
25+
import org.apache.kafka.common.config.AbstractConfig;
26+
import org.apache.kafka.common.config.ConfigDef;
27+
28+
import java.util.Map;
29+
30+
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
31+
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
32+
import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
33+
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
34+
35+
@Getter
36+
public class RegexTimestampParserConfig extends AbstractConfig {
37+
private static final String ITEM_TIMESTAMP_REGEX = "http.response.record.timestamp.parser.regex";
38+
private static final String PARSER_DELEGATE = "http.response.record.timestamp.parser.regex.delegate";
39+
40+
private final String timestampRegex;
41+
private final TimestampParser delegateParser;
42+
43+
RegexTimestampParserConfig(Map<String, ?> originals) {
44+
super(config(), originals);
45+
timestampRegex = getString(ITEM_TIMESTAMP_REGEX);
46+
delegateParser = getConfiguredInstance(PARSER_DELEGATE, TimestampParser.class);
47+
}
48+
49+
public static ConfigDef config() {
50+
return new ConfigDef()
51+
.define(ITEM_TIMESTAMP_REGEX, STRING, ".*", LOW, "Timestamp regex pattern in case the timestamp value is wrapped around some other text which is not in well defined format")
52+
.define(PARSER_DELEGATE, CLASS, DateTimeFormatterTimestampParser.class, HIGH, "Timestamp Parser Delegate Class");
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.github.castorm.kafka.connect.http.response.timestamp;
2+
3+
/*-
4+
* #%L
5+
* Kafka Connect HTTP
6+
* %%
7+
* Copyright (C) 2020 Cástor Rodríguez
8+
* %%
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import com.google.common.collect.ImmutableMap;
24+
import org.junit.jupiter.api.Test;
25+
26+
import java.util.Map;
27+
28+
import static com.github.castorm.kafka.connect.http.response.timestamp.RegexTimestampParserConfigTest.Fixture.config;
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
31+
public class RegexTimestampParserConfigTest {
32+
33+
@Test
34+
void whenItemTimestampParserDelegateClassConfigured_thenInitialized() {
35+
assertThat(config(ImmutableMap.of("http.response.record.timestamp.parser.regex.delegate", "com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisTimestampParser")).getDelegateParser())
36+
.isInstanceOf(EpochMillisTimestampParser.class);
37+
assertThat(config(ImmutableMap.of("http.response.record.timestamp.parser.regex.delegate", "com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisTimestampParser")).getTimestampRegex())
38+
.isEqualTo(".*");
39+
}
40+
41+
@Test
42+
void whenItemTimestampParserRegexConfigured_thenInitialized() {
43+
assertThat(config(ImmutableMap.of("http.response.record.timestamp.parser.regex", "(?:\\/Date\\()(.*?)(?:\\+0000\\)\\/)")).getDelegateParser())
44+
.isInstanceOf(DateTimeFormatterTimestampParser.class);
45+
assertThat(config(ImmutableMap.of("http.response.record.timestamp.parser.regex", "(?:\\/Date\\()(.*?)(?:\\+0000\\)\\/)")).getTimestampRegex())
46+
.isEqualTo("(?:\\/Date\\()(.*?)(?:\\+0000\\)\\/)");
47+
}
48+
49+
50+
interface Fixture {
51+
static RegexTimestampParserConfig config(Map<String, String> settings) {
52+
return new RegexTimestampParserConfig(settings);
53+
}
54+
}
55+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.github.castorm.kafka.connect.http.response.timestamp;
2+
3+
/*-
4+
* #%L
5+
* Kafka Connect HTTP
6+
* %%
7+
* Copyright (C) 2020 Cástor Rodríguez
8+
* %%
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import com.github.castorm.kafka.connect.http.response.timestamp.spi.TimestampParser;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.extension.ExtendWith;
27+
import org.mockito.Mock;
28+
import org.mockito.junit.jupiter.MockitoExtension;
29+
30+
import static com.github.castorm.kafka.connect.http.response.timestamp.RegexTimestampParserTest.Fixture.regex;
31+
import static java.time.Instant.ofEpochMilli;
32+
import static java.util.Collections.emptyMap;
33+
import static org.assertj.core.api.Assertions.assertThat;
34+
import static org.mockito.BDDMockito.given;
35+
import static org.mockito.BDDMockito.then;
36+
37+
@ExtendWith(MockitoExtension.class)
38+
public class RegexTimestampParserTest {
39+
40+
RegexTimestampParser parser;
41+
42+
@Mock
43+
TimestampParser delegate;
44+
45+
@Mock
46+
RegexTimestampParserConfig config;
47+
48+
@BeforeEach
49+
void setUp() {
50+
51+
parser = new RegexTimestampParser(__ -> config);
52+
given(config.getDelegateParser()).willReturn(delegate);
53+
given(config.getTimestampRegex()).willReturn(regex);
54+
parser.configure(emptyMap());
55+
}
56+
57+
@Test
58+
void givenLongFormatter_whenParse_thenDelegated() {
59+
60+
given(delegate.parse("123456789")).willReturn(ofEpochMilli(123456789L));
61+
62+
assertThat(parser.parse("Date123456789")).isEqualTo(ofEpochMilli(123456789L));
63+
}
64+
65+
@Test
66+
void givenFormatter_whenParse_thenDelegated() {
67+
68+
parser.parse("Date2011-12-03T10:15:30+01");
69+
then(delegate).should().parse("2011-12-03T10:15:30+01");
70+
}
71+
72+
@Test
73+
void givenNotNumber_whenParse_thenReturnedFromDelegate() {
74+
75+
given(delegate.parse("2011-12-03T10:15:30+01")).willReturn(ofEpochMilli(123));
76+
77+
assertThat(parser.parse("Date2011-12-03T10:15:30+01")).isEqualTo(ofEpochMilli(123));
78+
}
79+
80+
interface Fixture {
81+
String regex = "(?:Date)(.*)";
82+
}
83+
}

0 commit comments

Comments
 (0)