Skip to content

Commit df7a9a1

Browse files
committed
implement more end points
1 parent 8321819 commit df7a9a1

File tree

10 files changed

+337
-8
lines changed

10 files changed

+337
-8
lines changed

src/main/java/org/sourcelab/kafka/connect/apiclient/ApiClient.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66
import org.sourcelab.kafka.connect.apiclient.request.Request;
77
import org.sourcelab.kafka.connect.apiclient.request.RequestErrorResponse;
88
import org.sourcelab.kafka.connect.apiclient.request.delete.DeleteConnector;
9+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPlugin;
10+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPluginConfigDefinition;
911
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorStatus;
1012
import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition;
1113
import org.sourcelab.kafka.connect.apiclient.request.dto.Task;
1214
import org.sourcelab.kafka.connect.apiclient.request.dto.TaskStatus;
1315
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnector;
1416
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorConfig;
17+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorPlugins;
1518
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorStatus;
1619
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorTaskStatus;
1720
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorTasks;
@@ -22,6 +25,7 @@
2225
import org.sourcelab.kafka.connect.apiclient.request.post.PostConnectorTaskRestart;
2326
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorConfig;
2427
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorPause;
28+
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorPluginConfigValidate;
2529
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorResume;
2630
import org.sourcelab.kafka.connect.apiclient.rest.HttpClientRestClient;
2731
import org.sourcelab.kafka.connect.apiclient.rest.InvalidRequestException;
@@ -231,6 +235,30 @@ public Boolean restartConnectorTask(final String connectorName, final int taskId
231235
return submitRequest(new PostConnectorTaskRestart(connectorName, taskId));
232236
}
233237

238+
/**
239+
* Return a list of connector plugins installed in the Kafka Connect cluster.
240+
* https://docs.confluent.io/current/connect/restapi.html#get--connector-plugins-
241+
*
242+
* @return Collection of available connector plugins.
243+
*/
244+
public Collection<ConnectorPlugin> getConnectorPlugins() {
245+
return submitRequest(new GetConnectorPlugins());
246+
}
247+
248+
/**
249+
* Validate the provided configuration values against the configuration definition. This API performs per config
250+
* validation, returns suggested values and error messages during validation.
251+
* https://docs.confluent.io/current/connect/restapi.html#put--connector-plugins-(string-name)-config-validate
252+
*
253+
* @param configDefinition Defines the configuration to validate.
254+
* @return Results of the validation.
255+
*/
256+
public String validateConnectorPluginConfig(final ConnectorPluginConfigDefinition configDefinition) {
257+
return submitRequest(
258+
new PutConnectorPluginConfigValidate(configDefinition.getName(), configDefinition.getConfig())
259+
);
260+
}
261+
234262
/**
235263
* package protected for access in tests.
236264
* @return Rest Client.

src/main/java/org/sourcelab/kafka/connect/apiclient/request/dto/ConnectorPlugin.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,27 @@
22

33
import com.fasterxml.jackson.annotation.JsonAlias;
44

5+
/**
6+
* Represents details about a Connector Plugin.
7+
*/
58
public class ConnectorPlugin {
69
@JsonAlias("class")
710
private String className;
11+
private String type;
12+
private String version;
813

9-
private String getClassName() {
14+
public String getClassName() {
1015
return className;
1116
}
1217

18+
public String getType() {
19+
return type;
20+
}
21+
22+
public String getVersion() {
23+
return version;
24+
}
25+
1326
@Override
1427
public String toString() {
1528
return "ConnectorPlugin{"
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.dto;
2+
3+
import java.util.Collections;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
/**
8+
* Defines a Connector Plugin Configuration.
9+
*/
10+
public class ConnectorPluginConfigDefinition {
11+
private final String name;
12+
private final Map<String, String> config;
13+
14+
/**
15+
* Constructor
16+
* @param connectorPluginName Name of Connector Plugin.
17+
* @param config Configuration values for connector.
18+
*/
19+
public ConnectorPluginConfigDefinition(final String connectorPluginName, final Map<String, String> config) {
20+
this.name = connectorPluginName;
21+
this.config = Collections.unmodifiableMap(new HashMap<>(config));
22+
}
23+
24+
public String getName() {
25+
return name;
26+
}
27+
28+
public Map<String, String> getConfig() {
29+
return config;
30+
}
31+
32+
public static ConnectorPluginConfigDefinition.Builder newBuilder() {
33+
return new ConnectorPluginConfigDefinition.Builder();
34+
}
35+
36+
@Override
37+
public String toString() {
38+
return "ConnectorPluginConfigDefinition{"
39+
+ "name='" + name + '\''
40+
+ ", config=" + config
41+
+ '}';
42+
}
43+
44+
/**
45+
* Builder for ConnectorPluginConfigDefinition.
46+
*/
47+
public static final class Builder {
48+
private String name;
49+
private Map<String, String> config = new HashMap<>();
50+
51+
private Builder() {
52+
}
53+
54+
public ConnectorPluginConfigDefinition.Builder withName(final String name) {
55+
this.name = name;
56+
return this;
57+
}
58+
59+
public ConnectorPluginConfigDefinition.Builder withConfig(final Map<String, String> config) {
60+
this.config = new HashMap<>(config);
61+
return this;
62+
}
63+
64+
public ConnectorPluginConfigDefinition.Builder withConfig(final String key, final String value) {
65+
this.config.put(key, value);
66+
return this;
67+
}
68+
69+
public ConnectorPluginConfigDefinition.Builder withConfig(final String key, final Object value) {
70+
this.config.put(key, value.toString());
71+
return this;
72+
}
73+
74+
public ConnectorPluginConfigDefinition build() {
75+
return new ConnectorPluginConfigDefinition(name, config);
76+
}
77+
}
78+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.dto;
2+
3+
/**
4+
* Represents the results of validating a Connector Plugin configuration.
5+
*/
6+
public class ConnectorPluginConfigValidation {
7+
8+
}

src/main/java/org/sourcelab/kafka/connect/apiclient/request/put/PutConnectorConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public class PutConnectorConfig implements PutRequest<ConnectorDefinition> {
1616

1717
public PutConnectorConfig(final String name, final Map<String, String> config) {
1818
this.name = name;
19-
this.config = Collections.unmodifiableMap(new HashMap(config));
19+
this.config = Collections.unmodifiableMap(new HashMap<>(config));
2020
}
2121

2222
@Override
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.put;
2+
3+
import com.sun.xml.internal.rngom.util.Uri;
4+
import org.sourcelab.kafka.connect.apiclient.request.JacksonFactory;
5+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorDefinition;
6+
7+
import java.io.IOException;
8+
import java.util.Collections;
9+
import java.util.HashMap;
10+
import java.util.Map;
11+
12+
public class PutConnectorPluginConfigValidate implements PutRequest<String> {
13+
private final String connectorPluginName;
14+
private final Map<String, String> config;
15+
16+
public PutConnectorPluginConfigValidate(final String connectorPluginName, final Map<String, String> config) {
17+
if (connectorPluginName == null) {
18+
throw new NullPointerException("ConnectorPluginName parameter may not be null!");
19+
}
20+
if (config == null) {
21+
throw new NullPointerException("config parameter may not be null!");
22+
}
23+
this.connectorPluginName = connectorPluginName;
24+
this.config = Collections.unmodifiableMap(new HashMap<>(config));
25+
}
26+
27+
@Override
28+
public String getApiEndpoint() {
29+
return "/connector-plugins/" + Uri.escapeDisallowedChars(connectorPluginName) + "/config/validate";
30+
}
31+
32+
@Override
33+
public Object getRequestBody() {
34+
return config;
35+
}
36+
37+
@Override
38+
public String parseResponse(final String responseStr) throws IOException {
39+
//return JacksonFactory.newInstance().readValue(responseStr, ConnectorDefinition.class);
40+
return responseStr;
41+
}
42+
}

src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,8 @@ private <T> T submitPutRequest(final String url, final Object requestBody, final
273273
// Conditionally add content-type header?
274274
put.addHeader(new BasicHeader("Content-Type", "application/json"));
275275

276-
// Define required auth params
277-
final List<NameValuePair> params = new ArrayList<>();
278-
279-
// Convert to Json
276+
// Convert to Json and submit as payload.
280277
final String jsonPayloadStr = JacksonFactory.newInstance().writeValueAsString(requestBody);
281-
282278
put.setEntity(new StringEntity(jsonPayloadStr));
283279

284280
logger.info("Executing request {} with {}", put.getRequestLine(), jsonPayloadStr);

src/test/java/org/sourcelab/kafka/connect/apiclient/ApiClientTest.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.slf4j.Logger;
66
import org.slf4j.LoggerFactory;
77
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorDefinition;
8+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPluginConfigDefinition;
89
import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition;
910

1011
import java.util.HashMap;
@@ -147,10 +148,32 @@ public void testGetConnectorTaskStatus() {
147148
}
148149

149150
/**
150-
* Test retrieving status about a specific task for a running connector.
151+
* Test restarting a specific task for a running connector.
151152
*/
152153
@Test
153154
public void testRestartConnectorTask() {
154155
logger.info("Result: {}", apiClient.restartConnectorTask(connectorName, 0));
155156
}
157+
158+
/**
159+
* Test retrieving available connector plugins.
160+
*/
161+
@Test
162+
public void testGetConnectorPlugins() {
163+
logger.info("Result: {}", apiClient.getConnectorPlugins());
164+
}
165+
166+
/**
167+
* Test retrieving available connector plugins.
168+
*/
169+
@Test
170+
public void testValidateConnectorPluginConfig() {
171+
logger.info("Result: {}", apiClient.validateConnectorPluginConfig(ConnectorPluginConfigDefinition.newBuilder()
172+
.withName("VerifiableSourceConnector")
173+
.withConfig("connector.class", "org.apache.kafka.connect.tools.VerifiableSourceConnector")
174+
.withConfig("tasks.max", 3)
175+
.withConfig("topics", "test-topic")
176+
.build()
177+
));
178+
}
156179
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[{
2+
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
3+
"type": "sink",
4+
"version": "1.0.0-cp1"
5+
}, {
6+
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
7+
"type": "source",
8+
"version": "1.0.0-cp1"
9+
}]

0 commit comments

Comments
 (0)