Skip to content

Commit 8178d14

Browse files
authored
MINOR: Fix AuthorizationErrorTest (#1304)
Fix AuthorizationErrorTest failures due to the changes in https://github.com/apache/kafka/pull/17224/files
1 parent 0e7c937 commit 8178d14

File tree

3 files changed

+74
-81
lines changed

3 files changed

+74
-81
lines changed

kafka-rest/src/test/java/io/confluent/kafkarest/integration/AuthorizationErrorTest.java

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@
3939
import javax.ws.rs.core.Response;
4040
import kafka.security.authorizer.AclAuthorizer;
4141
import org.apache.kafka.clients.admin.AdminClientConfig;
42-
import org.apache.kafka.common.security.auth.KafkaPrincipal;
42+
import org.apache.kafka.common.acl.AccessControlEntry;
43+
import org.apache.kafka.common.acl.AclBinding;
44+
import org.apache.kafka.common.acl.AclOperation;
45+
import org.apache.kafka.common.acl.AclPermissionType;
46+
import org.apache.kafka.common.resource.PatternType;
47+
import org.apache.kafka.common.resource.ResourcePattern;
48+
import org.apache.kafka.common.resource.ResourceType;
4349
import org.apache.kafka.common.security.auth.SecurityProtocol;
4450
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
4551
import org.junit.jupiter.api.AfterEach;
@@ -151,18 +157,36 @@ public void testConsumerRequest(String quorum) {
151157
// test without acls
152158
verifySubscribeToTopic(true);
153159
// add acls
154-
SecureTestUtils.setConsumerAcls(zkConnect, TOPIC_NAME, USERNAME, CONSUMER_GROUP);
160+
setConsumerAcls();
155161
verifySubscribeToTopic(false);
156162
}
157163

164+
private void setConsumerAcls() {
165+
AclBinding topicAcl =
166+
new AclBinding(
167+
new ResourcePattern(ResourceType.TOPIC, TOPIC_NAME, PatternType.LITERAL),
168+
new AccessControlEntry(
169+
"User:" + USERNAME, "*", AclOperation.READ, AclPermissionType.ALLOW));
170+
AclBinding groupAcl =
171+
new AclBinding(
172+
new ResourcePattern(ResourceType.GROUP, CONSUMER_GROUP, PatternType.LITERAL),
173+
new AccessControlEntry(
174+
"User:" + USERNAME, "*", AclOperation.READ, AclPermissionType.ALLOW));
175+
try {
176+
createAcls(Arrays.asList(topicAcl, groupAcl), adminProperties());
177+
} catch (Exception e) {
178+
throw new RuntimeException(e);
179+
}
180+
}
181+
158182
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
159183
@ValueSource(strings = {"zk"})
160184
public void testProducerAuthorization(String quorum) {
161185
BinaryTopicProduceRequest request = BinaryTopicProduceRequest.create(topicRecords);
162186
// test without any acls
163187
testProduceToAuthorizationError(TOPIC_NAME, request);
164188
// add acls
165-
SecureTestUtils.setProduceAcls(zkConnect, TOPIC_NAME, USERNAME);
189+
setProduceAcls();
166190
testProduceToTopic(
167191
TOPIC_NAME,
168192
request,
@@ -173,6 +197,19 @@ public void testProducerAuthorization(String quorum) {
173197
request.toProduceRequest().getRecords());
174198
}
175199

200+
private void setProduceAcls() {
201+
AclBinding topicAcl =
202+
new AclBinding(
203+
new ResourcePattern(ResourceType.TOPIC, TOPIC_NAME, PatternType.LITERAL),
204+
new AccessControlEntry(
205+
"User:" + USERNAME, "*", AclOperation.WRITE, AclPermissionType.ALLOW));
206+
try {
207+
createAcls(Arrays.asList(topicAcl), adminProperties());
208+
} catch (Exception e) {
209+
throw new RuntimeException(e);
210+
}
211+
}
212+
176213
private void verifySubscribeToTopic(boolean expectFailure) {
177214
Response createResponse = createConsumerInstance(CONSUMER_GROUP);
178215
assertOKResponse(createResponse, Versions.KAFKA_V2_JSON);
@@ -224,7 +261,30 @@ protected SecurityProtocol getBrokerSecurityProtocol() {
224261
@Override
225262
protected void setupAcls() {
226263
// to allow plaintext consumer
227-
SecureTestUtils.setConsumerAcls(zkConnect, TOPIC_NAME, KafkaPrincipal.ANONYMOUS.getName(), "*");
264+
AclBinding topicAcl =
265+
new AclBinding(
266+
new ResourcePattern(ResourceType.TOPIC, TOPIC_NAME, PatternType.LITERAL),
267+
new AccessControlEntry(
268+
"User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW));
269+
AclBinding groupAcl =
270+
new AclBinding(
271+
new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL),
272+
new AccessControlEntry(
273+
"User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW));
274+
try {
275+
createAcls(Arrays.asList(topicAcl, groupAcl), adminProperties());
276+
} catch (Exception e) {
277+
throw new RuntimeException(e);
278+
}
279+
}
280+
281+
private Properties adminProperties() {
282+
Properties adminProperties = new Properties();
283+
adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
284+
adminProperties.put("security.protocol", "SASL_PLAINTEXT");
285+
adminProperties.setProperty("sasl.mechanism", "PLAIN");
286+
adminProperties.put("sasl.jaas.config", createPlainLoginModule("admin", "admin-secret"));
287+
return adminProperties;
228288
}
229289

230290
@AfterEach

kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.net.URISyntaxException;
4040
import java.time.Duration;
4141
import java.util.ArrayList;
42+
import java.util.Collection;
4243
import java.util.Collections;
4344
import java.util.HashMap;
4445
import java.util.Iterator;
@@ -50,6 +51,7 @@
5051
import java.util.Set;
5152
import java.util.concurrent.CompletableFuture;
5253
import java.util.concurrent.ExecutionException;
54+
import java.util.concurrent.TimeUnit;
5355
import java.util.function.Supplier;
5456
import java.util.stream.Collectors;
5557
import java.util.stream.IntStream;
@@ -77,6 +79,7 @@
7779
import org.apache.kafka.clients.producer.RecordMetadata;
7880
import org.apache.kafka.common.Node;
7981
import org.apache.kafka.common.TopicPartition;
82+
import org.apache.kafka.common.acl.AclBinding;
8083
import org.apache.kafka.common.config.ConfigResource;
8184
import org.apache.kafka.common.security.auth.SecurityProtocol;
8285
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -622,6 +625,13 @@ protected final void createTopic(
622625
}
623626
}
624627

628+
protected final void createAcls(Collection<AclBinding> acls, Properties adminProperties)
629+
throws Exception {
630+
try (AdminClient admin = AdminClient.create(adminProperties)) {
631+
admin.createAcls(acls).all().get(60, TimeUnit.SECONDS);
632+
}
633+
}
634+
625635
@SuppressWarnings({"unchecked"})
626636
private static Map<Object, Seq<Object>> convertReplicasAssignmentToScalaCompatibleType(
627637
Optional<Map<Integer, List<Integer>>> replicasAssignments) {

kafka-rest/src/test/java/io/confluent/kafkarest/integration/SecureTestUtils.java

Lines changed: 0 additions & 77 deletions
This file was deleted.

0 commit comments

Comments
 (0)