Skip to content

Commit 3fbe073

Browse files
author
Stephen Powis
committed
[ISSUE-24] Initial support for KIP-465
1 parent 7932d0e commit 3fbe073

25 files changed

+756
-56
lines changed

CHANGELOG.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,57 @@
22
The format is based on [Keep a Changelog](http://keepachangelog.com/)
33
and this project adheres to [Semantic Versioning](http://semver.org/).
44

5+
## 2.1.0 (06/26/2019)
6+
7+
### New Features
8+
- Added support to retrieve information about the Kafka-Connect server being queried.
9+
10+
```java
11+
/**
12+
* Retrieve details about the Kafka-Connect service itself.
13+
* @return ConnectServerVersion
14+
*/
15+
public ConnectServerVersion getConnectServerVersion()
16+
```
17+
18+
- Added support for [Expanded Connectors API Endpoint KIP-465](https://cwiki.apache.org/confluence/display/KAFKA/KIP-465%3A+Add+Consolidated+Connector+Endpoint+to+Connect+REST+API).
19+
20+
Only supported by Kafka-Connect servers running version 2.3.0+ the following methods were added:
21+
22+
```java
23+
/**
24+
* Get a list of deployed connectors, including the status for each connector.
25+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors
26+
*
27+
* Requires Kafka-Connect 2.3.0+
28+
*
29+
* @return All deployed connectors, and their respective statuses.
30+
*/
31+
public ConnectorsWithExpandedStatus getConnectorsWithExpandedStatus()
32+
33+
/**
34+
* Get a list of deployed connectors, including the definition for each connector.
35+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors
36+
*
37+
* Requires Kafka-Connect 2.3.0+
38+
*
39+
* @return All deployed connectors, and their respective definition.
40+
*/
41+
public ConnectorsWithExpandedInfo getConnectorsWithExpandedInfo()
42+
43+
/**
44+
* Get a list of deployed connectors, including all metadata available.
45+
* Currently includes both 'info' {@see getConnectorsWithExpandedInfo} and 'status' {@see getConnectorsWithExpandedStatus}
46+
* metadata.
47+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors
48+
*
49+
* Requires Kafka-Connect 2.3.0+
50+
*
51+
* @return All deployed connectors, and their respective metadata.
52+
*/
53+
public ConnectorsWithExpandedMetadata getConnectorsWithAllExpandedMetadata()
54+
```
55+
556
## 2.0.2 (06/06/2019)
657

758
### Internal Dependency Updates

src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java

Lines changed: 72 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,29 @@
2424
import org.sourcelab.kafka.connect.apiclient.request.Request;
2525
import org.sourcelab.kafka.connect.apiclient.request.RequestErrorResponse;
2626
import org.sourcelab.kafka.connect.apiclient.request.delete.DeleteConnector;
27+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectServerVersion;
2728
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorDefinition;
2829
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPlugin;
2930
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPluginConfigDefinition;
3031
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPluginConfigValidationResults;
3132
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorStatus;
33+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorsWithExpandedMetadata;
34+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorsWithExpandedInfo;
35+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorsWithExpandedStatus;
3236
import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition;
3337
import org.sourcelab.kafka.connect.apiclient.request.dto.Task;
3438
import org.sourcelab.kafka.connect.apiclient.request.dto.TaskStatus;
39+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectServerVersion;
3540
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnector;
3641
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorConfig;
3742
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorPlugins;
3843
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorStatus;
3944
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorTaskStatus;
4045
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorTasks;
4146
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectors;
47+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorsExpandAllDetails;
48+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorsExpandInfo;
49+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorsExpandStatus;
4250
import org.sourcelab.kafka.connect.apiclient.request.post.PostConnector;
4351
import org.sourcelab.kafka.connect.apiclient.request.post.PostConnectorRestart;
4452
import org.sourcelab.kafka.connect.apiclient.request.post.PostConnectorTaskRestart;
@@ -59,7 +67,7 @@
5967
/**
6068
* API Client for interacting with the Kafka-Connect Rest Endpoint.
6169
* Official Rest Endpoint documentation can be found here:
62-
* https://docs.confluent.io/current/connect/restapi.html
70+
* https://docs.confluent.io/current/connect/references/restapi.html
6371
*/
6472
public class KafkaConnectClient {
6573
private static final Logger logger = LoggerFactory.getLogger(KafkaConnectClient.class);
@@ -92,7 +100,7 @@ public KafkaConnectClient(final Configuration configuration) {
92100
/**
93101
* Constructor for injecting a RestClient implementation.
94102
* Typically only used in testing.
95-
* @param configuration Pardot Api Configuration.
103+
* @param configuration Api Client Configuration.
96104
* @param restClient RestClient implementation to use.
97105
*/
98106
public KafkaConnectClient(final Configuration configuration, final RestClient restClient) {
@@ -101,18 +109,64 @@ public KafkaConnectClient(final Configuration configuration, final RestClient re
101109
}
102110

103111
/**
104-
* Get a list of active connectors.
105-
* https://docs.confluent.io/current/connect/restapi.html#get--connectors
112+
* Retrieve details about the Kafka-Connect service itself.
113+
* @return ConnectServerVersion
114+
*/
115+
public ConnectServerVersion getConnectServerVersion() {
116+
return submitRequest(new GetConnectServerVersion());
117+
}
118+
119+
/**
120+
* Get a list of deployed connectors.
121+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors
106122
*
107123
* @return Collection of connector names currently deployed.
108124
*/
109125
public Collection<String> getConnectors() {
110126
return submitRequest(new GetConnectors());
111127
}
112128

129+
/**
130+
* Get a list of deployed connectors, including the status for each connector.
131+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors
132+
*
133+
* Requires Kafka-Connect 2.3.0+
134+
*
135+
* @return All deployed connectors, and their respective statuses.
136+
*/
137+
public ConnectorsWithExpandedStatus getConnectorsWithExpandedStatus() {
138+
return submitRequest(new GetConnectorsExpandStatus());
139+
}
140+
141+
/**
142+
* Get a list of deployed connectors, including the definition for each connector.
143+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors
144+
*
145+
* Requires Kafka-Connect 2.3.0+
146+
*
147+
* @return All deployed connectors, and their respective definition.
148+
*/
149+
public ConnectorsWithExpandedInfo getConnectorsWithExpandedInfo() {
150+
return submitRequest(new GetConnectorsExpandInfo());
151+
}
152+
153+
/**
154+
* Get a list of deployed connectors, including all metadata available.
155+
* Currently includes both 'info' {@see getConnectorsWithExpandedInfo} and 'status' {@see getConnectorsWithExpandedStatus}
156+
* metadata.
157+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors
158+
*
159+
* Requires Kafka-Connect 2.3.0+
160+
*
161+
* @return All deployed connectors, and their respective metadata.
162+
*/
163+
public ConnectorsWithExpandedMetadata getConnectorsWithAllExpandedMetadata() {
164+
return submitRequest(new GetConnectorsExpandAllDetails());
165+
}
166+
113167
/**
114168
* Get information about the connector.
115-
* https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)
169+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors-(string-name)
116170
* @param connectorName Name of connector.
117171
* @return Connector details.
118172
*/
@@ -122,7 +176,7 @@ public ConnectorDefinition getConnector(final String connectorName) {
122176

123177
/**
124178
* Get the configuration for the connector.
125-
* https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-config
179+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors-(string-name)-config
126180
* @param connectorName Name of connector.
127181
* @return Configuration for connector.
128182
*/
@@ -132,7 +186,7 @@ public Map<String, String> getConnectorConfig(final String connectorName) {
132186

133187
/**
134188
* Get the status of specified connector by name.
135-
* https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-config
189+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors-(string-name)-config
136190
*
137191
* @param connectorName Name of connector.
138192
* @return Status details of the connector.
@@ -143,7 +197,7 @@ public ConnectorStatus getConnectorStatus(final String connectorName) {
143197

144198
/**
145199
* Create a new connector, returning the current connector info if successful.
146-
* https://docs.confluent.io/current/connect/restapi.html#post--connectors
200+
* https://docs.confluent.io/current/connect/references/restapi.html#post--connectors
147201
*
148202
* @param connectorDefinition Defines the new connector to deploy
149203
* @return connector info.
@@ -154,7 +208,7 @@ public ConnectorDefinition addConnector(final NewConnectorDefinition connectorDe
154208

155209
/**
156210
* Update a connector's configuration.
157-
* https://docs.confluent.io/current/connect/restapi.html#put--connectors-(string-name)-config
211+
* https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-config
158212
*
159213
* @param connectorName Name of connector to update.
160214
* @param config Configuration values to set.
@@ -166,7 +220,7 @@ public ConnectorDefinition updateConnectorConfig(final String connectorName, fin
166220

167221
/**
168222
* Restart a connector.
169-
* https://docs.confluent.io/current/connect/restapi.html#post--connectors-(string-name)-restart
223+
* https://docs.confluent.io/current/connect/references/restapi.html#post--connectors-(string-name)-restart
170224
*
171225
* @param connectorName Name of connector to restart.
172226
* @return Boolean true if success.
@@ -177,7 +231,7 @@ public Boolean restartConnector(final String connectorName) {
177231

178232
/**
179233
* Pause a connector.
180-
* https://docs.confluent.io/current/connect/restapi.html#put--connectors-(string-name)-pause
234+
* https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-pause
181235
*
182236
* @param connectorName Name of connector to pause.
183237
* @return Boolean true if success.
@@ -188,7 +242,7 @@ public Boolean pauseConnector(final String connectorName) {
188242

189243
/**
190244
* Resume a connector.
191-
* https://docs.confluent.io/current/connect/restapi.html#put--connectors-(string-name)-resume
245+
* https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-resume
192246
*
193247
* @param connectorName Name of connector to resume.
194248
* @return Boolean true if success.
@@ -199,7 +253,7 @@ public Boolean resumeConnector(final String connectorName) {
199253

200254
/**
201255
* Resume a connector.
202-
* https://docs.confluent.io/current/connect/restapi.html#put--connectors-(string-name)-resume
256+
* https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-resume
203257
*
204258
* @param connectorName Name of connector to resume.
205259
* @return Boolean true if success.
@@ -210,7 +264,7 @@ public Boolean deleteConnector(final String connectorName) {
210264

211265
/**
212266
* Get a list of tasks currently running for the connector.
213-
* https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-tasks
267+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors-(string-name)-tasks
214268
*
215269
* @param connectorName Name of connector to retrieve tasks for.
216270
* @return Collection of details about each task.
@@ -221,7 +275,7 @@ public Collection<Task> getConnectorTasks(final String connectorName) {
221275

222276
/**
223277
* Get a task’s status.
224-
* https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-tasks-(int-taskid)-status
278+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors-(string-name)-tasks-(int-taskid)-status
225279
*
226280
* @param connectorName Name of connector to retrieve tasks for.
227281
* @param taskId Id of task to get status for.
@@ -233,7 +287,7 @@ public TaskStatus getConnectorTaskStatus(final String connectorName, final int t
233287

234288
/**
235289
* Restart an individual task.
236-
* https://docs.confluent.io/current/connect/restapi.html#post--connectors-(string-name)-tasks-(int-taskid)-restart
290+
* https://docs.confluent.io/current/connect/references/restapi.html#post--connectors-(string-name)-tasks-(int-taskid)-restart
237291
*
238292
* @param connectorName Name of connector to restart tasks for.
239293
* @param taskId Id of task to restart
@@ -245,7 +299,7 @@ public Boolean restartConnectorTask(final String connectorName, final int taskId
245299

246300
/**
247301
* Return a list of connector plugins installed in the Kafka Connect cluster.
248-
* https://docs.confluent.io/current/connect/restapi.html#get--connector-plugins-
302+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connector-plugins-
249303
*
250304
* @return Collection of available connector plugins.
251305
*/
@@ -256,7 +310,7 @@ public Collection<ConnectorPlugin> getConnectorPlugins() {
256310
/**
257311
* Validate the provided configuration values against the configuration definition. This API performs per config
258312
* validation, returns suggested values and error messages during validation.
259-
* https://docs.confluent.io/current/connect/restapi.html#put--connector-plugins-(string-name)-config-validate
313+
* https://docs.confluent.io/current/connect/references/restapi.html#put--connector-plugins-(string-name)-config-validate
260314
*
261315
* @param configDefinition Defines the configuration to validate.
262316
* @return Results of the validation.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.dto;
2+
3+
/**
4+
* Represents details about the kafka-connect server.
5+
*/
6+
public class ConnectServerVersion {
7+
private String version;
8+
private String commit;
9+
private String kafkaClusterId;
10+
11+
public String getVersion() {
12+
return version;
13+
}
14+
15+
public String getCommit() {
16+
return commit;
17+
}
18+
19+
public String getKafkaClusterId() {
20+
return kafkaClusterId;
21+
}
22+
23+
@Override
24+
public String toString() {
25+
return "ConnectServerVersion{" +
26+
"version='" + version + '\'' +
27+
", commit='" + commit + '\'' +
28+
", kafkaClusterId='" + kafkaClusterId + '\'' +
29+
'}';
30+
}
31+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.dto;
2+
3+
import com.fasterxml.jackson.annotation.JsonAnySetter;
4+
import com.fasterxml.jackson.annotation.JsonProperty;
5+
6+
import java.util.Collection;
7+
import java.util.Collections;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.stream.Collectors;
11+
12+
public class ConnectorsWithExpandedInfo {
13+
@JsonAnySetter
14+
private Map<String, ConnectorsWithExpandedInfo.ConnectorWithExpandedInfo> results = new HashMap<>();
15+
16+
public Collection<String> getConnectorNames() {
17+
return results.keySet();
18+
}
19+
20+
public ConnectorDefinition getDefinitionForConnector(final String connectorName) {
21+
if (!results.containsKey(connectorName)) {
22+
throw new IllegalArgumentException("Results do not contain connector: " + connectorName);
23+
}
24+
return results.get(connectorName).getInfo();
25+
}
26+
27+
public Collection<ConnectorDefinition> getAllStatuses() {
28+
return results
29+
.values()
30+
.stream()
31+
.map(ConnectorsWithExpandedInfo.ConnectorWithExpandedInfo::getInfo)
32+
.collect(Collectors.toList());
33+
}
34+
35+
public Map<String, ConnectorDefinition> getMappedStatuses() {
36+
return Collections.unmodifiableMap(
37+
results
38+
.entrySet()
39+
.stream()
40+
.collect(Collectors.toMap(Map.Entry::getKey, (entry) -> entry.getValue().getInfo()))
41+
);
42+
}
43+
44+
@Override
45+
public String toString() {
46+
return "ConnectorsWithExpandedInfo{"
47+
+ results
48+
+ '}';
49+
}
50+
51+
public static class ConnectorWithExpandedInfo {
52+
53+
@JsonProperty("info")
54+
private ConnectorDefinition info;
55+
56+
public ConnectorDefinition getInfo() {
57+
return info;
58+
}
59+
60+
@Override
61+
public String toString() {
62+
return "Connector{" +
63+
"info=" + info +
64+
'}';
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)