44import com .getindata .connectors .http .sink .httpclient .JavaNetSinkHttpClient ;
55import com .github .tomakehurst .wiremock .WireMockServer ;
66import com .github .tomakehurst .wiremock .http .Fault ;
7+ import org .apache .flink .configuration .ConfigConstants ;
8+ import org .apache .flink .configuration .Configuration ;
9+ import org .apache .flink .metrics .Counter ;
10+ import org .apache .flink .metrics .Metric ;
11+ import org .apache .flink .metrics .MetricConfig ;
12+ import org .apache .flink .metrics .MetricGroup ;
13+ import org .apache .flink .metrics .reporter .MetricReporter ;
714import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
815import org .junit .jupiter .api .AfterEach ;
916import org .junit .jupiter .api .BeforeEach ;
@@ -26,10 +33,54 @@ public class HttpSinkConnectionTest {
2633 private static final List <String > messages =
2734 messageIds .stream ().map (i -> "{\" http-sink-id\" :" + i + "}" ).collect (Collectors .toList ());
2835
36+ private StreamExecutionEnvironment env ;
2937 private WireMockServer wireMockServer ;
3038
39+ // must be public because of the reflection
40+ public static class SendErrorsTestReporter implements MetricReporter {
41+ static volatile List <Counter > numRecordsSendErrors = null ;
42+
43+ public static long getCount () {
44+ return numRecordsSendErrors .stream ().map (Counter ::getCount ).reduce (0L , Long ::sum );
45+ }
46+
47+ public static void reset () {
48+ numRecordsSendErrors = new ArrayList <>();
49+ }
50+
51+ @ Override
52+ public void open (MetricConfig metricConfig ) {
53+ }
54+
55+ @ Override
56+ public void close () {
57+ }
58+
59+ @ Override
60+ public void notifyOfAddedMetric (
61+ Metric metric , String s , MetricGroup metricGroup
62+ ) {
63+ if ("numRecordsSendErrors" .equals (s )) {
64+ numRecordsSendErrors .add ((Counter ) metric );
65+ }
66+ }
67+
68+ @ Override
69+ public void notifyOfRemovedMetric (Metric metric , String s , MetricGroup metricGroup ) {
70+ }
71+ }
72+
3173 @ BeforeEach
3274 public void setUp () {
75+ SendErrorsTestReporter .reset ();
76+
77+ env = StreamExecutionEnvironment .getExecutionEnvironment (new Configuration () {
78+ {
79+ this .setString (ConfigConstants .METRICS_REPORTER_PREFIX + "sendErrorsTestReporter." +
80+ ConfigConstants .METRICS_REPORTER_CLASS_SUFFIX , SendErrorsTestReporter .class .getName ());
81+ }
82+ });
83+
3384 wireMockServer = new WireMockServer (SERVER_PORT );
3485 wireMockServer .start ();
3586 }
@@ -48,7 +99,6 @@ public void testConnection() throws Exception {
4899 .withStatus (200 )
49100 .withBody ("{}" )));
50101
51- var env = StreamExecutionEnvironment .getExecutionEnvironment ();
52102 var source = env .fromCollection (messages );
53103 var httpSink = HttpSink .<String >builder ()
54104 .setEndpointUrl ("http://localhost:" + SERVER_PORT + "/myendpoint" )
@@ -97,7 +147,6 @@ public void testServerErrorConnection() throws Exception {
97147 .willReturn (aResponse ().withStatus (200 ))
98148 .willSetStateTo ("Cause Success" ));
99149
100- var env = StreamExecutionEnvironment .getExecutionEnvironment ();
101150 var source = env .fromCollection (List .of (messages .get (0 )));
102151 var httpSink = HttpSink .<String >builder ()
103152 .setEndpointUrl ("http://localhost:" + SERVER_PORT + "/myendpoint" )
@@ -109,9 +158,11 @@ public void testServerErrorConnection() throws Exception {
109158 source .sinkTo (httpSink );
110159 env .execute ("Http Sink test failed connection" );
111160
112- var postedRequests = wireMockServer .findAll (postRequestedFor (urlPathEqualTo ("/myendpoint" )));
113- assertEquals (2 , postedRequests .size ());
114- assertEquals (postedRequests .get (0 ).getBodyAsString (), postedRequests .get (1 ).getBodyAsString ());
161+ assertEquals (1 , SendErrorsTestReporter .getCount ());
162+ // TODO: reintroduce along with the retries
163+ // var postedRequests = wireMockServer.findAll(postRequestedFor(urlPathEqualTo("/myendpoint")));
164+ // assertEquals(2, postedRequests.size());
165+ // assertEquals(postedRequests.get(0).getBodyAsString(), postedRequests.get(1).getBodyAsString());
115166 }
116167
117168 @ Test
@@ -120,7 +171,7 @@ public void testFailedConnection() throws Exception {
120171 .withHeader ("Content-Type" , equalTo ("application/json" ))
121172 .inScenario ("Retry Scenario" )
122173 .whenScenarioStateIs (STARTED )
123- .willReturn (serverError ().withFault (Fault .EMPTY_RESPONSE ))
174+ .willReturn (aResponse ().withFault (Fault .EMPTY_RESPONSE ))
124175 .willSetStateTo ("Cause Success" ));
125176 wireMockServer .stubFor (any (urlPathEqualTo ("/myendpoint" ))
126177 .withHeader ("Content-Type" , equalTo ("application/json" ))
@@ -129,7 +180,6 @@ public void testFailedConnection() throws Exception {
129180 .willReturn (aResponse ().withStatus (200 ))
130181 .willSetStateTo ("Cause Success" ));
131182
132- var env = StreamExecutionEnvironment .getExecutionEnvironment ();
133183 var source = env .fromCollection (List .of (messages .get (0 )));
134184 var httpSink = HttpSink .<String >builder ()
135185 .setEndpointUrl ("http://localhost:" + SERVER_PORT + "/myendpoint" )
@@ -141,8 +191,9 @@ public void testFailedConnection() throws Exception {
141191 source .sinkTo (httpSink );
142192 env .execute ("Http Sink test failed connection" );
143193
144- var postedRequests = wireMockServer .findAll (postRequestedFor (urlPathEqualTo ("/myendpoint" )));
145- assertEquals (2 , postedRequests .size ());
146- assertEquals (postedRequests .get (0 ).getBodyAsString (), postedRequests .get (1 ).getBodyAsString ());
194+ assertEquals (1 , SendErrorsTestReporter .getCount ());
195+ // var postedRequests = wireMockServer.findAll(postRequestedFor(urlPathEqualTo("/myendpoint")));
196+ // assertEquals(2, postedRequests.size());
197+ // assertEquals(postedRequests.get(0).getBodyAsString(), postedRequests.get(1).getBodyAsString());
147198 }
148199}
0 commit comments