|
6 | 6 | import org.sourcelab.kafka.connect.apiclient.request.Request; |
7 | 7 | import org.sourcelab.kafka.connect.apiclient.request.RequestErrorResponse; |
8 | 8 | import org.sourcelab.kafka.connect.apiclient.request.delete.DeleteConnector; |
| 9 | +import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorDefinition; |
9 | 10 | import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPlugin; |
10 | 11 | import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPluginConfigDefinition; |
11 | 12 | import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPluginConfigValidationResults; |
|
20 | 21 | import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorTaskStatus; |
21 | 22 | import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorTasks; |
22 | 23 | import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectors; |
23 | | -import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorDefinition; |
24 | 24 | import org.sourcelab.kafka.connect.apiclient.request.post.PostConnector; |
25 | 25 | import org.sourcelab.kafka.connect.apiclient.request.post.PostConnectorRestart; |
26 | 26 | import org.sourcelab.kafka.connect.apiclient.request.post.PostConnectorTaskRestart; |
|
37 | 37 | import java.util.Collection; |
38 | 38 | import java.util.Map; |
39 | 39 |
|
| 40 | +/** |
| 41 | + * API Client for interacting with the Kafka-Connect Rest Endpoint. |
| 42 | + * Official Rest Endpoint documentation can be found here: |
| 43 | + * https://docs.confluent.io/current/connect/restapi.html |
| 44 | + */ |
40 | 45 | public class ApiClient { |
41 | 46 | private static final Logger logger = LoggerFactory.getLogger(ApiClient.class); |
42 | 47 |
|
@@ -76,49 +81,32 @@ public ApiClient(final Configuration configuration, final RestClient restClient) |
76 | 81 | this.restClient = restClient; |
77 | 82 | } |
78 | 83 |
|
79 | | - private <T> T submitRequest(final Request<T> request) { |
80 | | - // Submit request |
81 | | - final RestResponse restResponse = getRestClient().submitRequest(request); |
82 | | - final int responseCode = restResponse.getHttpCode(); |
83 | | - String responseStr = restResponse.getResponseStr(); |
84 | | - |
85 | | - // If we have a valid response |
86 | | - logger.info("Response: {}", restResponse); |
87 | | - |
88 | | - // Check for invalid http status codes |
89 | | - if (responseCode >= 200 && responseCode < 300) { |
90 | | - // These response codes have no values |
91 | | - if ((responseCode == 204 || responseCode == 205) && responseStr == null) { |
92 | | - // Avoid NPE |
93 | | - responseStr = ""; |
94 | | - } |
95 | | - |
96 | | - try { |
97 | | - return request.parseResponse(responseStr); |
98 | | - } catch (final IOException exception) { |
99 | | - throw new RuntimeException(exception.getMessage(), exception); |
100 | | - } |
101 | | - } |
102 | | - |
103 | | - // Attempt to parse error response |
104 | | - try { |
105 | | - final RequestErrorResponse errorResponse = JacksonFactory.newInstance().readValue(responseStr, RequestErrorResponse.class); |
106 | | - throw new InvalidRequestException(errorResponse.getMessage(), errorResponse.getErrorCode()); |
107 | | - } catch (IOException e) { |
108 | | - // swallow |
109 | | - } |
110 | | - throw new InvalidRequestException("Invalid response from server: " + responseStr, restResponse.getHttpCode()); |
111 | | - |
112 | | - } |
113 | | - |
| 84 | + /** |
| 85 | + * Get a list of active connectors. |
| 86 | + * https://docs.confluent.io/current/connect/restapi.html#get--connectors |
| 87 | + * |
| 88 | + * @return Collection of connector names currently deployed. |
| 89 | + */ |
114 | 90 | public Collection<String> getConnectors() { |
115 | 91 | return submitRequest(new GetConnectors()); |
116 | 92 | } |
117 | 93 |
|
| 94 | + /** |
| 95 | + * Get information about the connector. |
| 96 | + * https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name) |
| 97 | + * @param connectorName Name of connector. |
| 98 | + * @return Connector details. |
| 99 | + */ |
118 | 100 | public ConnectorDefinition getConnector(final String connectorName) { |
119 | 101 | return submitRequest(new GetConnector(connectorName)); |
120 | 102 | } |
121 | 103 |
|
| 104 | + /** |
| 105 | + * Get the configuration for the connector. |
| 106 | + * https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-config |
| 107 | + * @param connectorName Name of connector. |
| 108 | + * @return Configuration for connector. |
| 109 | + */ |
122 | 110 | public Map<String, String> getConnectorConfig(final String connectorName) { |
123 | 111 | return submitRequest(new GetConnectorConfig(connectorName)); |
124 | 112 | } |
@@ -260,10 +248,40 @@ public ConnectorPluginConfigValidationResults validateConnectorPluginConfig(fina |
260 | 248 | ); |
261 | 249 | } |
262 | 250 |
|
263 | | - /** |
264 | | - * package protected for access in tests. |
265 | | - * @return Rest Client. |
266 | | - */ |
| 251 | + private <T> T submitRequest(final Request<T> request) { |
| 252 | + // Submit request |
| 253 | + final RestResponse restResponse = getRestClient().submitRequest(request); |
| 254 | + final int responseCode = restResponse.getHttpCode(); |
| 255 | + String responseStr = restResponse.getResponseStr(); |
| 256 | + |
| 257 | + // If we have a valid response |
| 258 | + logger.info("Response: {}", restResponse); |
| 259 | + |
| 260 | + // Check for invalid http status codes |
| 261 | + if (responseCode >= 200 && responseCode < 300) { |
| 262 | + // These response codes have no values |
| 263 | + if ((responseCode == 204 || responseCode == 205) && responseStr == null) { |
| 264 | + // Avoid NPE |
| 265 | + responseStr = ""; |
| 266 | + } |
| 267 | + |
| 268 | + try { |
| 269 | + return request.parseResponse(responseStr); |
| 270 | + } catch (final IOException exception) { |
| 271 | + throw new RuntimeException(exception.getMessage(), exception); |
| 272 | + } |
| 273 | + } |
| 274 | + |
| 275 | + // Attempt to parse error response |
| 276 | + try { |
| 277 | + final RequestErrorResponse errorResponse = JacksonFactory.newInstance().readValue(responseStr, RequestErrorResponse.class); |
| 278 | + throw new InvalidRequestException(errorResponse.getMessage(), errorResponse.getErrorCode()); |
| 279 | + } catch (IOException e) { |
| 280 | + // swallow |
| 281 | + } |
| 282 | + throw new InvalidRequestException("Invalid response from server: " + responseStr, restResponse.getHttpCode()); |
| 283 | + } |
| 284 | + |
267 | 285 | private RestClient getRestClient() { |
268 | 286 | // If we haven't initialized. |
269 | 287 | if (!isInitialized) { |
|
0 commit comments