Skip to content

Commit f5f6ce9

Browse files
authored
KREST-10243 Add custom-request-logging to kafka-rest, and log error-codes for when various rate-limiters are triggered (#1168)
See PR for details
1 parent 118fa87 commit f5f6ce9

19 files changed

+1218
-65
lines changed

checkstyle/import_control.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@
115115
<allow class="javax.xml.bind.DatatypeConverter" />
116116
<allow class="org.everit.json.schema.ValidationException" />
117117
<allow class="org.hibernate.validator.constraints.URL" />
118+
<allow class="javax.servlet.http.HttpServletRequest" />
118119

119120
<!-- Static field imports -->
120121
<allow class="io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS" />

checkstyle/suppressions.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@
4141
<suppress checks="MethodTypeParameterName" files="AbstractConsumerTest" />
4242

4343
<!-- The tests below are fairly big and complex, so they go over the NCSS threshold. -->
44-
<suppress checks="JavaNCSS" files="ProtobufConverterTest|ProduceActionTest" />
44+
<suppress checks="JavaNCSS" files="ProtobufConverterTest|ProduceActionTest|CustomLogIntegrationTest" />
4545

4646
<!-- TestUtils#encodeComparable contains a complex, switch-like if-else structure. -->
47-
<suppress checks="CyclomaticComplexity" files="TestUtils" />
47+
<suppress checks="CyclomaticComplexity" files="TestUtils|CustomLogIntegrationTest" />
4848

4949
<suppress
5050
checks="ClassDataAbstractionCoupling"

kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestApplication.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,36 @@
3232
import io.confluent.kafkarest.extension.ResourceAccesslistFeature;
3333
import io.confluent.kafkarest.extension.RestResourceExtension;
3434
import io.confluent.kafkarest.ratelimit.RateLimitFeature;
35+
import io.confluent.kafkarest.requestlog.CustomLog;
36+
import io.confluent.kafkarest.requestlog.CustomLogRequestAttributes;
37+
import io.confluent.kafkarest.requestlog.GlobalDosFilterListener;
38+
import io.confluent.kafkarest.requestlog.PerConnectionDosFilterListener;
3539
import io.confluent.kafkarest.resources.ResourcesFeature;
3640
import io.confluent.kafkarest.response.JsonStreamMessageBodyReader;
3741
import io.confluent.kafkarest.response.ResponseModule;
3842
import io.confluent.rest.Application;
43+
import io.confluent.rest.RestConfig;
3944
import io.confluent.rest.exceptions.ConstraintViolationExceptionMapper;
4045
import io.confluent.rest.exceptions.WebApplicationExceptionMapper;
4146
import java.text.SimpleDateFormat;
4247
import java.util.List;
4348
import java.util.Properties;
4449
import java.util.TimeZone;
4550
import javax.ws.rs.core.Configurable;
51+
import org.eclipse.jetty.server.CustomRequestLog;
52+
import org.eclipse.jetty.server.RequestLog;
53+
import org.eclipse.jetty.server.Slf4jRequestLogWriter;
4654
import org.eclipse.jetty.servlet.ServletContextHandler;
4755
import org.eclipse.jetty.util.StringUtil;
4856
import org.glassfish.jersey.server.ServerProperties;
57+
import org.slf4j.Logger;
58+
import org.slf4j.LoggerFactory;
4959

5060
/** Utilities for configuring and running an embedded Kafka server. */
5161
public class KafkaRestApplication extends Application<KafkaRestConfig> {
5262

63+
private static final Logger log = LoggerFactory.getLogger(KafkaRestApplication.class);
64+
5365
List<RestResourceExtension> restResourceExtensions;
5466

5567
public KafkaRestApplication() {
@@ -69,12 +81,61 @@ public KafkaRestApplication(KafkaRestConfig config, String path) {
6981
}
7082

7183
public KafkaRestApplication(KafkaRestConfig config, String path, String listenerName) {
72-
super(config, path, listenerName);
84+
this(config, path, listenerName, null /* requestLogWriter */, null /* requestLogFormat */);
85+
}
86+
87+
/* This public-constructor exists to facilitate testing with a custom requestLogWriter, and
88+
* requestLogFormat in an integration test in a different package.
89+
*/
90+
public KafkaRestApplication(
91+
KafkaRestConfig config,
92+
String path,
93+
String listenerName,
94+
RequestLog.Writer requestLogWriter,
95+
String requestLogFormat) {
96+
super(
97+
config,
98+
path,
99+
listenerName,
100+
createRequestLog(config, requestLogWriter, requestLogFormat, log, listenerName));
73101

74102
restResourceExtensions =
75103
config.getConfiguredInstances(
76104
KafkaRestConfig.KAFKA_REST_RESOURCE_EXTENSION_CONFIG, RestResourceExtension.class);
77105
config.setMetrics(metrics);
106+
107+
// Set up listeners for dos-filters, needed for custom-logging for when dos-filter rate-limits.
108+
this.addNonGlobalDosfilterListener(new PerConnectionDosFilterListener());
109+
this.addGlobalDosfilterListener(new GlobalDosFilterListener());
110+
}
111+
112+
private static RequestLog createRequestLog(
113+
KafkaRestConfig config,
114+
RequestLog.Writer requestLogWriter,
115+
String requestLogFormat,
116+
Logger log,
117+
String listenerName) {
118+
if (config.getBoolean(KafkaRestConfig.USE_CUSTOM_REQUEST_LOGGING_CONFIG)) {
119+
log.info("For rest-app with listener {}, configuring custom request logging", listenerName);
120+
if (requestLogWriter == null) {
121+
Slf4jRequestLogWriter logWriter = new Slf4jRequestLogWriter();
122+
logWriter.setLoggerName(config.getString(RestConfig.REQUEST_LOGGER_NAME_CONFIG));
123+
requestLogWriter = logWriter;
124+
}
125+
126+
if (requestLogFormat == null) {
127+
requestLogFormat = CustomRequestLog.EXTENDED_NCSA_FORMAT + " %{ms}T";
128+
}
129+
130+
CustomLog customRequestLog =
131+
new CustomLog(
132+
requestLogWriter,
133+
requestLogFormat,
134+
new String[] {CustomLogRequestAttributes.REST_ERROR_CODE});
135+
return customRequestLog;
136+
}
137+
// Return null, as Application's ctor would set-up a default request-logger.
138+
return null;
78139
}
79140

80141
@Override

kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,12 @@ public class KafkaRestConfig extends RestConfig {
492492
+ "requests will be processed for before the connection is closed.";
493493
private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT = "500";
494494

495+
public static final String USE_CUSTOM_REQUEST_LOGGING_CONFIG = "use.custom.request.logging";
496+
private static final String USE_CUSTOM_REQUEST_LOGGING_DOC =
497+
"Whether to use custom-request-logging i.e. CustomLog.java. Instead of using"
498+
+ "Jetty's request-logging.";
499+
private static final boolean USE_CUSTOM_REQUEST_LOGGING_DEFAULT = true;
500+
495501
private static final ConfigDef config;
496502
private volatile Metrics metrics;
497503

@@ -889,7 +895,13 @@ protected static ConfigDef baseKafkaRestConfigDef() {
889895
Type.LONG,
890896
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT,
891897
Importance.LOW,
892-
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC);
898+
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC)
899+
.define(
900+
USE_CUSTOM_REQUEST_LOGGING_CONFIG,
901+
Type.BOOLEAN,
902+
USE_CUSTOM_REQUEST_LOGGING_DEFAULT,
903+
Importance.LOW,
904+
USE_CUSTOM_REQUEST_LOGGING_DOC);
893905
}
894906

895907
private static Properties getPropsFromFile(String propsFile) throws RestConfigException {

kafka-rest/src/main/java/io/confluent/kafkarest/ratelimit/FixedCostRateLimitRequestFilter.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616
package io.confluent.kafkarest.ratelimit;
1717

1818
import static com.google.common.base.Preconditions.checkArgument;
19+
import static io.confluent.kafkarest.ratelimit.RateLimitExceededException.ErrorCodes.PERMITS_MAX_GLOBAL_LIMIT_EXCEEDED;
20+
import static io.confluent.kafkarest.ratelimit.RateLimitExceededException.ErrorCodes.PERMITS_MAX_PER_CLUSTER_LIMIT_EXCEEDED;
1921
import static java.util.Objects.requireNonNull;
2022

2123
import com.google.common.cache.LoadingCache;
24+
import io.confluent.kafkarest.requestlog.CustomLogRequestAttributes;
2225
import javax.ws.rs.container.ContainerRequestContext;
2326
import javax.ws.rs.container.ContainerRequestFilter;
2427

@@ -29,6 +32,7 @@
2932
* io.confluent.kafkarest.KafkaRestConfig#RATE_LIMIT_DEFAULT_COST_CONFIG} configs.
3033
*/
3134
final class FixedCostRateLimitRequestFilter implements ContainerRequestFilter {
35+
3236
private final RequestRateLimiter genericRateLimiter;
3337
private final int cost;
3438
private final LoadingCache<String, RequestRateLimiter> perClusterRateLimiterCache;
@@ -49,9 +53,28 @@ public void filter(ContainerRequestContext requestContext) {
4953
String clusterId = requestContext.getUriInfo().getPathParameters(true).getFirst("clusterId");
5054
if (clusterId != null) {
5155
RequestRateLimiter rateLimiter = perClusterRateLimiterCache.getUnchecked(clusterId);
52-
rateLimiter.rateLimit(cost);
56+
try {
57+
rateLimiter.rateLimit(cost);
58+
} catch (RateLimitExceededException ex) {
59+
// The setProperty() call below maps to HttpServletRequest.setAttribute(), when Jersey is
60+
// running in servlet environment, see
61+
// https://github.com/eclipse-ee4j/jersey/blob/d60da249fdd06a5059472c6d9c1d8a757588e710/containers/jersey-servlet-core/src/main/java/org/glassfish/jersey/servlet/ServletPropertiesDelegate.java#L29
62+
requestContext.setProperty(
63+
CustomLogRequestAttributes.REST_ERROR_CODE, PERMITS_MAX_PER_CLUSTER_LIMIT_EXCEEDED);
64+
throw ex;
65+
}
66+
}
67+
68+
try {
69+
// apply generic (global) rate limiter
70+
genericRateLimiter.rateLimit(cost);
71+
} catch (RateLimitExceededException ex) {
72+
// The setProperty() call below maps to HttpServletRequest.setAttribute, when Jersey is
73+
// running in servlet environment, see
74+
// https://github.com/eclipse-ee4j/jersey/blob/d60da249fdd06a5059472c6d9c1d8a757588e710/containers/jersey-servlet-core/src/main/java/org/glassfish/jersey/servlet/ServletPropertiesDelegate.java#L29
75+
requestContext.setProperty(
76+
CustomLogRequestAttributes.REST_ERROR_CODE, PERMITS_MAX_GLOBAL_LIMIT_EXCEEDED);
77+
throw ex;
5378
}
54-
// apply generic (global) rate limiter
55-
genericRateLimiter.rateLimit(cost);
5679
}
5780
}

kafka-rest/src/main/java/io/confluent/kafkarest/ratelimit/RateLimitExceededException.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,27 @@
2525
*/
2626
public final class RateLimitExceededException extends StatusCodeException {
2727

28+
/*
29+
* These error-codes signify which rate-limit got exceeded, and then
30+
* logged with request in CustomLog.
31+
*/
32+
public static class ErrorCodes {
33+
34+
// Only apply to Produce APIs.
35+
public static final int PRODUCE_MAX_REQUESTS_GLOBAL_LIMIT_EXCEEDED = 429001;
36+
public static final int PRODUCE_MAX_REQUESTS_PER_TENANT_LIMIT_EXCEEDED = 429002;
37+
public static final int PRODUCE_MAX_BYTES_GLOBAL_LIMIT_EXCEEDED = 429003;
38+
public static final int PRODUCE_MAX_BYTES_PER_TENANT_LIMIT_EXCEEDED = 429004;
39+
40+
// Only apply to Admin APIs for now.
41+
public static final int PERMITS_MAX_GLOBAL_LIMIT_EXCEEDED = 429005;
42+
public static final int PERMITS_MAX_PER_CLUSTER_LIMIT_EXCEEDED = 429006;
43+
44+
// Apply to all APIs, via the Dos-filters at Jetty layer.
45+
public static final int DOS_FILTER_MAX_REQUEST_LIMIT_EXCEEDED = 429007;
46+
public static final int DOS_FILTER_MAX_REQUEST_PER_CONNECTION_LIMIT_EXCEEDED = 429008;
47+
}
48+
2849
public RateLimitExceededException() {
2950
super(
3051
Response.Status.TOO_MANY_REQUESTS,
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2023 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package io.confluent.kafkarest.requestlog;
17+
18+
import org.eclipse.jetty.server.CustomRequestLog;
19+
import org.eclipse.jetty.server.Request;
20+
import org.eclipse.jetty.server.RequestLog;
21+
import org.eclipse.jetty.server.Response;
22+
import org.eclipse.jetty.util.component.AbstractLifeCycle;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
/**
27+
* CustomLog implements Jetty's RequestLog interface. It offers the same log-format jetty's
28+
* CustomRequestLog. Additionally, it would append configured request-attributes (see
29+
* requestAttributesToLog) to the end of the log. NOTE - this needs to extend
30+
* AbstractLifeCycle(implicitly implement LifeCycle) to correctly initialise CustomRequestLog which
31+
* implements LifeCycle. This in turn will correctly initialise the input RequestLog.Writer, if it
32+
* also implements LifeCycle.
33+
*/
34+
public class CustomLog extends AbstractLifeCycle implements RequestLog {
35+
36+
private static final Logger log = LoggerFactory.getLogger(CustomLog.class);
37+
38+
private final CustomRequestLog delegateJettyLog;
39+
40+
private final String[] requestAttributesToLog;
41+
42+
public CustomLog(RequestLog.Writer writer, String formatString, String[] requestAttributesToLog) {
43+
for (String attr : requestAttributesToLog) {
44+
// Add format-specifier to log request-attributes as response-headers in Jetty's
45+
// CustomRequestLog.
46+
formatString += " %{" + attr + "}o";
47+
}
48+
this.requestAttributesToLog = requestAttributesToLog;
49+
this.delegateJettyLog = new CustomRequestLog(writer, formatString);
50+
}
51+
52+
@Override
53+
protected synchronized void doStart() throws Exception {
54+
if (this.delegateJettyLog != null) {
55+
this.delegateJettyLog.start();
56+
}
57+
}
58+
59+
@Override
60+
protected void doStop() throws Exception {
61+
if (this.delegateJettyLog != null) {
62+
this.delegateJettyLog.stop();
63+
}
64+
}
65+
66+
@Override
67+
public void log(Request request, Response response) {
68+
// The configured request-attributes are converted to response-headers so Jetty can log them.
69+
// Request-attributes are chosen to propagate custom-info to the request-log, as
70+
// 1. Its idiomatic as per ServletRequest(which is implemented by Jetty's request).
71+
// 2. Places like dosfilter-listeners, ex - GlobalJettyDosFilterListener, only request is
72+
// readily available(Vs response).
73+
// Unfortunately Jetty doesn't provide a way to log request-attributes, hence they are converted
74+
// to response-headers, which can be logged.
75+
for (String attr : this.requestAttributesToLog) {
76+
Object attrVal = request.getAttribute(attr);
77+
if (attrVal != null) {
78+
request.removeAttribute(attr);
79+
response.setHeader(attr, attrVal.toString());
80+
}
81+
}
82+
83+
try {
84+
delegateJettyLog.log(request, response);
85+
} catch (Exception e) {
86+
log.debug(
87+
"Logging with Jetty's CustomRequestLogFailed with exception {}, stack is \n{}",
88+
e,
89+
e.getStackTrace());
90+
} finally {
91+
// Remove the response-headers that were added above just for logging.
92+
for (String attr : this.requestAttributesToLog) {
93+
response.getHttpFields().remove(attr);
94+
}
95+
}
96+
}
97+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2023 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License
14+
*/
15+
16+
package io.confluent.kafkarest.requestlog;
17+
18+
/**
19+
* This class lists the request-attributes that are used to propagate custom-info that will be added
20+
* to the request-log(see CustomLog.java).
21+
*/
22+
public final class CustomLogRequestAttributes {
23+
24+
private CustomLogRequestAttributes() {}
25+
26+
public static final String REST_ERROR_CODE = "REST_ERROR_CODE";
27+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2023 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package io.confluent.kafkarest.requestlog;
17+
18+
import io.confluent.kafkarest.ratelimit.RateLimitExceededException.ErrorCodes;
19+
import javax.servlet.http.HttpServletRequest;
20+
import org.eclipse.jetty.servlets.DoSFilter;
21+
import org.eclipse.jetty.servlets.DoSFilter.Action;
22+
import org.eclipse.jetty.servlets.DoSFilter.OverLimit;
23+
24+
/**
25+
* This class is a Jetty DosFilter.Listener, for the global-dos filter. This on 429s will populate
26+
* relevant metadata as attributed on the request, that later-on can be logged.
27+
*/
28+
public class GlobalDosFilterListener extends DoSFilter.Listener {
29+
30+
@Override
31+
public Action onRequestOverLimit(
32+
HttpServletRequest request, OverLimit overlimit, DoSFilter dosFilter) {
33+
// KREST-10418: we don't use super function to get action object because
34+
// it will log a WARN line, in order to reduce verbosity
35+
Action action = Action.fromDelay(dosFilter.getDelayMs());
36+
if (action.equals(Action.REJECT)) {
37+
request.setAttribute(
38+
CustomLogRequestAttributes.REST_ERROR_CODE,
39+
ErrorCodes.DOS_FILTER_MAX_REQUEST_LIMIT_EXCEEDED);
40+
}
41+
return action;
42+
}
43+
}

0 commit comments

Comments
 (0)