Skip to content

Commit c4e55ab

Browse files
authored
Fix interruption of markAllocationIdAsInSync (elastic#100610)
`IndexShard#markAllocationIdAsInSync` is interruptible because it may block the thread on a monitor waiting for the local checkpoint to advance, but we lost the ability to interrupt it on a recovery cancellation in elastic#95270. Closes elastic#96578 Closes elastic#100589
1 parent 29e3d28 commit c4e55ab

File tree

4 files changed

+129
-4
lines changed

4 files changed

+129
-4
lines changed

docs/changelog/100610.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 100610
2+
summary: Fix interruption of `markAllocationIdAsInSync`
3+
area: Recovery
4+
type: bug
5+
issues:
6+
- 96578
7+
- 100589

server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,15 @@
3535
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
3636
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
3737
import org.elasticsearch.action.admin.indices.stats.ShardStats;
38+
import org.elasticsearch.action.bulk.BulkAction;
3839
import org.elasticsearch.action.index.IndexRequestBuilder;
40+
import org.elasticsearch.action.support.ActionTestUtils;
3941
import org.elasticsearch.action.support.ActiveShardCount;
4042
import org.elasticsearch.action.support.ChannelActionListener;
4143
import org.elasticsearch.action.support.PlainActionFuture;
44+
import org.elasticsearch.action.support.SubscribableListener;
4245
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
46+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
4347
import org.elasticsearch.action.support.replication.ReplicationResponse;
4448
import org.elasticsearch.cluster.ClusterState;
4549
import org.elasticsearch.cluster.ClusterStateListener;
@@ -70,6 +74,8 @@
7074
import org.elasticsearch.common.unit.ByteSizeUnit;
7175
import org.elasticsearch.common.unit.ByteSizeValue;
7276
import org.elasticsearch.common.util.CollectionUtils;
77+
import org.elasticsearch.common.util.concurrent.EsExecutors;
78+
import org.elasticsearch.core.TimeValue;
7379
import org.elasticsearch.gateway.ReplicaShardAllocatorIT;
7480
import org.elasticsearch.index.Index;
7581
import org.elasticsearch.index.IndexService;
@@ -85,6 +91,7 @@
8591
import org.elasticsearch.index.seqno.ReplicationTracker;
8692
import org.elasticsearch.index.seqno.RetentionLeases;
8793
import org.elasticsearch.index.seqno.SequenceNumbers;
94+
import org.elasticsearch.index.shard.GlobalCheckpointListeners;
8895
import org.elasticsearch.index.shard.IndexShard;
8996
import org.elasticsearch.index.shard.ShardId;
9097
import org.elasticsearch.index.store.Store;
@@ -122,7 +129,9 @@
122129
import java.util.Map;
123130
import java.util.concurrent.CountDownLatch;
124131
import java.util.concurrent.ExecutionException;
132+
import java.util.concurrent.Executor;
125133
import java.util.concurrent.Semaphore;
134+
import java.util.concurrent.TimeUnit;
126135
import java.util.concurrent.atomic.AtomicBoolean;
127136
import java.util.function.Consumer;
128137
import java.util.stream.Collectors;
@@ -132,6 +141,7 @@
132141
import static java.util.stream.Collectors.toList;
133142
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
134143
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
144+
import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener;
135145
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
136146
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
137147
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
@@ -1688,6 +1698,104 @@ public void testWaitForClusterStateToBeAppliedOnSourceNode() throws Exception {
16881698
}
16891699
}
16901700

1701+
public void testDeleteIndexDuringFinalization() throws Exception {
1702+
internalCluster().startMasterOnlyNode();
1703+
final var primaryNode = internalCluster().startDataOnlyNode();
1704+
String indexName = "test-index";
1705+
createIndex(indexName, indexSettings(1, 0).build());
1706+
ensureGreen(indexName);
1707+
final List<IndexRequestBuilder> indexRequests = IntStream.range(0, between(10, 500))
1708+
.mapToObj(n -> client().prepareIndex(indexName).setSource("foo", "bar"))
1709+
.toList();
1710+
indexRandom(randomBoolean(), true, true, indexRequests);
1711+
assertThat(indicesAdmin().prepareFlush(indexName).get().getFailedShards(), equalTo(0));
1712+
1713+
final var replicaNode = internalCluster().startDataOnlyNode();
1714+
1715+
final SubscribableListener<Void> recoveryCompleteListener = new SubscribableListener<>();
1716+
final PlainActionFuture<AcknowledgedResponse> deleteListener = new PlainActionFuture<>();
1717+
1718+
final var threadPool = internalCluster().clusterService().threadPool();
1719+
1720+
final var indexId = internalCluster().clusterService().state().routingTable().index(indexName).getIndex();
1721+
final var primaryIndexShard = internalCluster().getInstance(IndicesService.class, primaryNode)
1722+
.indexServiceSafe(indexId)
1723+
.getShard(0);
1724+
final var globalCheckpointBeforeRecovery = primaryIndexShard.getLastSyncedGlobalCheckpoint();
1725+
1726+
final var replicaNodeTransportService = asInstanceOf(
1727+
MockTransportService.class,
1728+
internalCluster().getInstance(TransportService.class, replicaNode)
1729+
);
1730+
replicaNodeTransportService.addRequestHandlingBehavior(
1731+
PeerRecoveryTargetService.Actions.TRANSLOG_OPS,
1732+
(handler, request, channel, task) -> handler.messageReceived(
1733+
request,
1734+
new TestTransportChannel(ActionTestUtils.assertNoFailureListener(response -> {
1735+
// Process the TRANSLOG_OPS response on the replica (avoiding failing it due to a concurrent delete) but
1736+
// before sending the response back send another document to the primary, advancing the GCP to prevent the replica
1737+
// being marked as in-sync (NB below we delay the replica write until after the index is deleted)
1738+
client().prepareIndex(indexName).setSource("foo", "baz").execute(ActionListener.noop());
1739+
1740+
primaryIndexShard.addGlobalCheckpointListener(
1741+
globalCheckpointBeforeRecovery + 1,
1742+
new GlobalCheckpointListeners.GlobalCheckpointListener() {
1743+
@Override
1744+
public Executor executor() {
1745+
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
1746+
}
1747+
1748+
@Override
1749+
public void accept(long globalCheckpoint, Exception e) {
1750+
assertNull(e);
1751+
1752+
// Now the GCP has advanced the replica won't be marked in-sync so respond to the TRANSLOG_OPS request
1753+
// to start recovery finalization
1754+
try {
1755+
channel.sendResponse(response);
1756+
} catch (IOException ex) {
1757+
fail(ex);
1758+
}
1759+
1760+
// Wait a short while for finalization to block on advancing the replica's GCP and then delete the index
1761+
threadPool.schedule(
1762+
() -> client().admin().indices().prepareDelete(indexName).execute(deleteListener),
1763+
TimeValue.timeValueMillis(100),
1764+
EsExecutors.DIRECT_EXECUTOR_SERVICE
1765+
);
1766+
}
1767+
},
1768+
TimeValue.timeValueSeconds(10)
1769+
);
1770+
})),
1771+
task
1772+
)
1773+
);
1774+
1775+
// delay the delivery of the replica write until the end of the test so the replica never becomes in-sync
1776+
replicaNodeTransportService.addRequestHandlingBehavior(
1777+
BulkAction.NAME + "[s][r]",
1778+
(handler, request, channel, task) -> recoveryCompleteListener.addListener(
1779+
assertNoFailureListener(ignored -> handler.messageReceived(request, channel, task))
1780+
)
1781+
);
1782+
1783+
// Create the replica to trigger the whole process
1784+
assertAcked(
1785+
client().admin()
1786+
.indices()
1787+
.prepareUpdateSettings(indexName)
1788+
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
1789+
);
1790+
1791+
// Wait for the index to be deleted
1792+
assertTrue(deleteListener.get(20, TimeUnit.SECONDS).isAcknowledged());
1793+
1794+
final var peerRecoverySourceService = internalCluster().getInstance(PeerRecoverySourceService.class, primaryNode);
1795+
assertBusy(() -> assertEquals(0, peerRecoverySourceService.numberOfOngoingRecoveries()));
1796+
recoveryCompleteListener.onResponse(null);
1797+
}
1798+
16911799
private void assertGlobalCheckpointIsStableAndSyncedInAllNodes(String indexName, List<String> nodes, int shard) throws Exception {
16921800
assertThat(nodes, is(not(empty())));
16931801

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.elasticsearch.common.util.CancellableThreads;
3434
import org.elasticsearch.common.util.concurrent.CountDown;
3535
import org.elasticsearch.common.util.set.Sets;
36-
import org.elasticsearch.core.CheckedRunnable;
3736
import org.elasticsearch.core.IOUtils;
3837
import org.elasticsearch.core.Nullable;
3938
import org.elasticsearch.core.Releasable;
@@ -426,7 +425,7 @@ public void onFailure(Exception e) {
426425
}
427426

428427
static void runUnderPrimaryPermit(
429-
CheckedRunnable<Exception> action,
428+
Runnable action,
430429
IndexShard primary,
431430
CancellableThreads cancellableThreads,
432431
ActionListener<Void> listener
@@ -1260,7 +1259,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
12601259
*/
12611260
final SubscribableListener<Void> markInSyncStep = new SubscribableListener<>();
12621261
runUnderPrimaryPermit(
1263-
() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
1262+
() -> cancellableThreads.execute(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint)),
12641263
shard,
12651264
cancellableThreads,
12661265
markInSyncStep

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
import org.elasticsearch.xcontent.XContentParser.Token;
115115
import org.elasticsearch.xcontent.XContentParserConfiguration;
116116
import org.elasticsearch.xcontent.XContentType;
117+
import org.hamcrest.Matchers;
117118
import org.junit.After;
118119
import org.junit.AfterClass;
119120
import org.junit.Before;
@@ -2036,7 +2037,17 @@ protected static boolean isTurkishLocale() {
20362037
|| Locale.getDefault().getLanguage().equals(new Locale("az").getLanguage());
20372038
}
20382039

2039-
public static void fail(Throwable t, String msg, Object... args) {
2040+
public static <T> T fail(Throwable t, String msg, Object... args) {
20402041
throw new AssertionError(org.elasticsearch.common.Strings.format(msg, args), t);
20412042
}
2043+
2044+
public static <T> T fail(Throwable t) {
2045+
return fail(t, "unexpected");
2046+
}
2047+
2048+
@SuppressWarnings("unchecked")
2049+
public static <T> T asInstanceOf(Class<T> clazz, Object o) {
2050+
assertThat(o, Matchers.instanceOf(clazz));
2051+
return (T) o;
2052+
}
20422053
}

0 commit comments

Comments
 (0)