Skip to content

Commit ea0b9ab

Browse files
committed
[FLINK-38451] Stop logging interrupts as ERROR
When cancelling a job, we may interrupt the KafkaCommitter. This currently leads to ERROR log "Transaction ... encountered error and data has been potentially lost." However, that exception is expected and not leading to any data loss beyond the normal inconsistencies because of cancellation. In many cases, the commit already succeeded. Further, a job restart will lead to data being eventually committed.
1 parent 032c9dd commit ea0b9ab

File tree

2 files changed

+91
-14
lines changed

2 files changed

+91
-14
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one or more
3-
* contributor license agreements. See the NOTICE file distributed with
4-
* this work for additional information regarding copyright ownership.
5-
* The ASF licenses this file to You under the Apache License, Version 2.0
6-
* (the "License"); you may not use this file except in compliance with
7-
* the License. You may obtain a copy of the License at
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
89
*
9-
* http://www.apache.org/licenses/LICENSE-2.0
10+
* http://www.apache.org/licenses/LICENSE-2.0
1011
*
1112
* Unless required by applicable law or agreed to in writing, software
1213
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -26,6 +27,7 @@
2627
import org.apache.flink.util.IOUtils;
2728

2829
import org.apache.kafka.clients.producer.ProducerConfig;
30+
import org.apache.kafka.common.errors.InterruptException;
2931
import org.apache.kafka.common.errors.InvalidTxnStateException;
3032
import org.apache.kafka.common.errors.ProducerFencedException;
3133
import org.apache.kafka.common.errors.RetriableException;
@@ -125,6 +127,17 @@ public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
125127
e);
126128
handleFailedTransaction(producer);
127129
request.signalFailedWithKnownReason(e);
130+
} catch (InterruptException e) {
131+
// note that we do not attempt to recover from this exception; producer is likely
132+
// left in an inconsistent state
133+
LOG.info(
134+
"Committing transaction ({}) was interrupted. This most likely happens because the task is being cancelled.",
135+
request,
136+
e);
137+
// reset the interrupt flag that is set when InterruptException is created
138+
Thread.interrupted();
139+
// propagate interruption through java.lang.InterruptedException instead
140+
throw new InterruptedException(e.getMessage());
128141
} catch (Exception e) {
129142
LOG.error(
130143
"Transaction ({}) encountered error and data has been potentially lost.",

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitterTest.java

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one or more
3-
* contributor license agreements. See the NOTICE file distributed with
4-
* this work for additional information regarding copyright ownership.
5-
* The ASF licenses this file to You under the Apache License, Version 2.0
6-
* (the "License"); you may not use this file except in compliance with
7-
* the License. You may obtain a copy of the License at
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
89
*
9-
* http://www.apache.org/licenses/LICENSE-2.0
10+
* http://www.apache.org/licenses/LICENSE-2.0
1011
*
1112
* Unless required by applicable law or agreed to in writing, software
1213
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,6 +20,7 @@
1920

2021
import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
2122
import org.apache.flink.connector.kafka.sink.KafkaCommittable;
23+
import org.apache.flink.testutils.logging.LoggerAuditingExtension;
2224
import org.apache.flink.util.TestLoggerExtension;
2325

2426
import org.apache.kafka.clients.CommonClientConfigs;
@@ -29,17 +31,22 @@
2931
import org.junit.jupiter.api.AfterEach;
3032
import org.junit.jupiter.api.Test;
3133
import org.junit.jupiter.api.extension.ExtendWith;
34+
import org.junit.jupiter.api.extension.RegisterExtension;
3235
import org.junit.jupiter.params.ParameterizedTest;
3336
import org.junit.jupiter.params.provider.ValueSource;
3437

3538
import java.io.IOException;
39+
import java.net.ServerSocket;
3640
import java.util.Collections;
3741
import java.util.Properties;
42+
import java.util.concurrent.atomic.AtomicBoolean;
3843
import java.util.concurrent.atomic.AtomicInteger;
3944
import java.util.function.BiFunction;
4045

4146
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
4247
import static org.assertj.core.api.Assertions.assertThat;
48+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
49+
import static org.slf4j.event.Level.ERROR;
4350

4451
/** Tests for {@link KafkaCommitter}. */
4552
@ExtendWith({TestLoggerExtension.class})
@@ -53,6 +60,10 @@ class KafkaCommitterTest {
5360
private static final BiFunction<Properties, String, FlinkKafkaInternalProducer<?, ?>>
5461
MOCK_FACTORY = (properties, transactionalId) -> new MockProducer(properties, null);
5562

63+
@RegisterExtension
64+
public final LoggerAuditingExtension errorLogger =
65+
new LoggerAuditingExtension(KafkaCommitter.class, ERROR);
66+
5667
@AfterEach
5768
public void check() {
5869
checkProducerLeak();
@@ -158,6 +169,59 @@ public void testCommitterProducerClosedOnError() throws IOException, Interrupted
158169
}
159170
}
160171

172+
@Test
173+
public void testInterrupt() throws IOException {
174+
ServerSocket serverSocket = new ServerSocket(0);
175+
Properties properties = getProperties();
176+
properties.put(
177+
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
178+
"http://localhost:" + serverSocket.getLocalPort());
179+
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
180+
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(Long.MAX_VALUE));
181+
try (final KafkaCommitter committer =
182+
new KafkaCommitter(
183+
properties, TRANS_ID, SUB_ID, ATTEMPT, false, MOCK_FACTORY);
184+
FlinkKafkaInternalProducer<Object, Object> producer =
185+
new FlinkKafkaInternalProducer<>(properties, TRANS_ID);
186+
ReadableBackchannel<TransactionFinished> backchannel =
187+
BackchannelFactory.getInstance()
188+
.getReadableBackchannel(SUB_ID, ATTEMPT, TRANS_ID)) {
189+
final MockCommitRequest<KafkaCommittable> request =
190+
new MockCommitRequest<>(KafkaCommittable.of(producer));
191+
192+
producer.resumeTransaction(PRODUCER_ID, EPOCH);
193+
194+
AtomicBoolean interrupting = interruptOnMessage(Thread.currentThread(), serverSocket);
195+
assertThatThrownBy(() -> committer.commit(Collections.singletonList(request)))
196+
.isInstanceOf(InterruptedException.class);
197+
198+
// verify that the interrupt happened only after committing started
199+
assertThat(interrupting).isTrue();
200+
201+
// no errors are logged
202+
assertThat(errorLogger.getMessages()).isEmpty();
203+
204+
assertThat(backchannel).doesNotHave(transactionFinished(true));
205+
}
206+
}
207+
208+
private AtomicBoolean interruptOnMessage(Thread mainThread, ServerSocket serverSocket) {
209+
final AtomicBoolean interrupting = new AtomicBoolean();
210+
new Thread(
211+
() -> {
212+
try {
213+
serverSocket.accept().getInputStream().read();
214+
interrupting.set(true);
215+
mainThread.interrupt();
216+
} catch (IOException e) {
217+
throw new RuntimeException(e);
218+
}
219+
},
220+
"canceller")
221+
.start();
222+
return interrupting;
223+
}
224+
161225
@ParameterizedTest
162226
@ValueSource(booleans = {true, false})
163227
public void testKafkaCommitterRecyclesTransactionalId(boolean hasProducer)

0 commit comments

Comments
 (0)