Skip to content

Commit d81ccdf

Browse files
Mpdreamzrusscam
authored andcommitted
Fix #3954 make sure BulkAll() is aborted properly if the whole bulk request keeps returning a bad statuscode (#4014)
Ensure BulkAll() is aborted properly if the whole bulk request keeps returning a bad statuscode. (cherry picked from commit c0293a0)
1 parent adcad57 commit d81ccdf

File tree

5 files changed

+112
-14
lines changed

5 files changed

+112
-14
lines changed

src/Nest/CommonAbstractions/Extensions/Extensions.cs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
using System.Globalization;
66
using System.Linq;
77
using System.Reflection;
8+
using System.Runtime.ExceptionServices;
89
using System.Runtime.Serialization;
910
using System.Text;
1011
using System.Threading;
1112
using System.Threading.Tasks;
13+
using Elasticsearch.Net;
1214
using Newtonsoft.Json;
1315
using Newtonsoft.Json.Converters;
1416
using Newtonsoft.Json.Linq;
@@ -24,9 +26,9 @@ internal static class EmptyReadOnly<TKey, TValue>
2426
{
2527
public static readonly IReadOnlyDictionary<TKey, TValue> Dictionary = new ReadOnlyDictionary<TKey, TValue>(new Dictionary<TKey, TValue>(0));
2628
}
27-
2829
internal static class Extensions
2930
{
31+
3032
internal static bool NotWritable(this QueryContainer q) => q == null || !q.IsWritable;
3133

3234
internal static bool NotWritable(this IEnumerable<QueryContainer> qs) => qs == null || qs.All(q => q.NotWritable());
@@ -111,13 +113,13 @@ internal static string ToEnumValue<T>(this T enumValue) where T : struct
111113
return null;
112114
}
113115

114-
#if !DOTNETCORE
115-
internal static string Utf8String(this byte[] bytes) => bytes == null ? null : Encoding.UTF8.GetString(bytes);
116-
#else
117-
internal static string Utf8String(this byte[] bytes) => bytes == null ? null : Encoding.UTF8.GetString(bytes, 0, bytes.Length);
118-
#endif
116+
#if !DOTNETCORE
117+
internal static string Utf8String(this byte[] bytes) => bytes == null ? null : Encoding.UTF8.GetString(bytes);
118+
#else
119+
internal static string Utf8String(this byte[] bytes) => bytes == null ? null : Encoding.UTF8.GetString(bytes, 0, bytes.Length);
120+
#endif
119121

120-
internal static byte[] Utf8Bytes(this string s) => s.IsNullOrEmpty() ? null : Encoding.UTF8.GetBytes(s);
122+
internal static byte[] Utf8Bytes(this string s) => s.IsNullOrEmpty() ? null : Encoding.UTF8.GetBytes(s);
121123

122124
internal static bool IsNullOrEmpty(this TypeName value) => value == null || value.GetHashCode() == 0;
123125

@@ -174,7 +176,7 @@ internal static bool IsEmpty<T>(this IEnumerable<T> list)
174176
return !enumerable.Any() || enumerable.All(t => t == null);
175177
}
176178

177-
internal static void ThrowIfNull<T>(this T value, string name, string message = null)
179+
internal static void ThrowIfNull<T>(this T value, string name, string message = null)
178180
{
179181
if (value == null && message.IsNullOrEmpty()) throw new ArgumentNullException(name);
180182
else if (value == null) throw new ArgumentNullException(name, "Argument can not be null when " + message);
@@ -250,6 +252,12 @@ internal static async Task ForEachAsync<TSource, TResult>(
250252
continue;
251253

252254
var task = await Task.WhenAny(tasks).ConfigureAwait(false);
255+
if (task.Exception != null
256+
&& (task.IsFaulted && task.Exception.Flatten().InnerExceptions.First() is Exception e))
257+
{
258+
ExceptionDispatchInfo.Capture(e).Throw();
259+
return;
260+
}
253261
tasks.Remove(task);
254262
i++;
255263
}

src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public class BulkAllObservable<T> : IDisposable, IObservable<IBulkAllResponse> w
2020
private readonly int _maxDegreeOfParallelism;
2121
private readonly IBulkAllRequest<T> _partitionedBulkRequest;
2222
private readonly Func<IBulkResponseItem, T, bool> _retryPredicate;
23+
private readonly Action<IBulkResponse> _bulkResponseCallback;
2324
private Action _incrementFailed = () => { };
2425
private Action _incrementRetries = () => { };
2526

@@ -36,6 +37,8 @@ public BulkAllObservable(
3637
_bulkSize = _partitionedBulkRequest.Size ?? CoordinatedRequestDefaults.BulkAllSizeDefault;
3738
_retryPredicate = _partitionedBulkRequest.RetryDocumentPredicate ?? RetryBulkActionPredicate;
3839
_droppedDocumentCallBack = _partitionedBulkRequest.DroppedDocumentCallback ?? DroppedDocumentCallbackDefault;
40+
_bulkResponseCallback = _partitionedBulkRequest.BulkResponseCallback;
41+
3942
_maxDegreeOfParallelism =
4043
_partitionedBulkRequest.MaxDegreeOfParallelism ?? CoordinatedRequestDefaults.BulkAllMaxDegreeOfParallelismDefault;
4144
_compositeCancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
@@ -132,6 +135,8 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
132135

133136
_compositeCancelToken.ThrowIfCancellationRequested();
134137

138+
_bulkResponseCallback?.Invoke(response);
139+
135140
if (!response.ApiCall.Success)
136141
return await HandleBulkRequest(buffer, page, backOffRetries, response);
137142

@@ -177,26 +182,36 @@ private void HandleDroppedDocuments(List<Tuple<IBulkResponseItem, T>> droppedDoc
177182
private async Task<IBulkAllResponse> HandleBulkRequest(IList<T> buffer, long page, int backOffRetries, IBulkResponse response)
178183
{
179184
var clientException = response.ApiCall.OriginalException as ElasticsearchClientException;
180-
//TODO expose this on IAPiCallDetails as RetryLater in 7.0?
181-
var failureReason = clientException?.FailureReason.GetValueOrDefault(PipelineFailure.Unexpected);
185+
var failureReason = clientException?.FailureReason;
186+
var reason = failureReason?.GetStringValue() ?? nameof(PipelineFailure.BadRequest);
182187
switch (failureReason)
183188
{
184189
case PipelineFailure.MaxRetriesReached:
185190
//TODO move this to its own PipelineFailure classification in 7.0
186191
if (response.ApiCall.AuditTrail.Last().Event == AuditEvent.FailedOverAllNodes)
187192
throw ThrowOnBadBulk(response, $"BulkAll halted after attempted bulk failed over all the active nodes");
188193

189-
return await RetryDocuments(page, ++backOffRetries, buffer);
194+
ThrowOnExhaustedRetries();
195+
return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false);
190196
case PipelineFailure.CouldNotStartSniffOnStartup:
191197
case PipelineFailure.BadAuthentication:
192198
case PipelineFailure.NoNodesAttempted:
193199
case PipelineFailure.SniffFailure:
194200
case PipelineFailure.Unexpected:
195-
throw ThrowOnBadBulk(response,
196-
$"BulkAll halted after {nameof(PipelineFailure)}{failureReason.GetStringValue()} from _bulk");
201+
throw ThrowOnBadBulk(response, $"BulkAll halted after {nameof(PipelineFailure)}.{reason} from _bulk");
197202
default:
203+
ThrowOnExhaustedRetries();
198204
return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false);
199205
}
206+
207+
void ThrowOnExhaustedRetries()
208+
{
209+
if (_partitionedBulkRequest.ContinueAfterDroppedDocuments || backOffRetries < _backOffRetries) return;
210+
211+
throw ThrowOnBadBulk(response,
212+
$"BulkAll halted after {nameof(PipelineFailure)}.{reason} from _bulk and exhausting retries ({backOffRetries})"
213+
);
214+
}
200215
}
201216

202217
private async Task<IBulkAllResponse> RetryDocuments(long page, int backOffRetries, IList<T> retryDocuments)

src/Nest/Document/Multiple/BulkAll/BulkAllRequest.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ public interface IBulkAllRequest<T> where T : class
103103
/// non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)
104104
/// </summary>
105105
int? WaitForActiveShards { get; set; }
106+
107+
/// <summary>
108+
/// Be notified every time a bulk response returns, this includes retries.
109+
/// <see cref="IObserver{T}.OnNext"/> is only called for successful batches.
110+
/// </summary>
111+
Action<IBulkResponse> BulkResponseCallback { get; set; }
106112
}
107113

108114
public class BulkAllRequest<T> : IBulkAllRequest<T>
@@ -171,6 +177,9 @@ public BulkAllRequest(IEnumerable<T> documents)
171177

172178
/// <inheritdoc />
173179
public int? WaitForActiveShards { get; set; }
180+
181+
/// <inheritdoc />
182+
public Action<IBulkResponse> BulkResponseCallback { get; set; }
174183
}
175184

176185
public class BulkAllDescriptor<T> : DescriptorBase<BulkAllDescriptor<T>, IBulkAllRequest<T>>, IBulkAllRequest<T>
@@ -205,6 +214,7 @@ public BulkAllDescriptor(IEnumerable<T> documents)
205214
Time IBulkAllRequest<T>.Timeout { get; set; }
206215
TypeName IBulkAllRequest<T>.Type { get; set; }
207216
int? IBulkAllRequest<T>.WaitForActiveShards { get; set; }
217+
Action<IBulkResponse> IBulkAllRequest<T>.BulkResponseCallback { get; set; }
208218

209219
/// <inheritdoc cref="IBulkAllRequest{T}.MaxDegreeOfParallelism" />
210220
public BulkAllDescriptor<T> MaxDegreeOfParallelism(int? parallelism) =>
@@ -285,5 +295,9 @@ public BulkAllDescriptor<T> ContinueAfterDroppedDocuments(bool proceed = true) =
285295
/// <inheritdoc cref="IBulkAllRequest{T}.DroppedDocumentCallback" />
286296
public BulkAllDescriptor<T> DroppedDocumentCallback(Action<IBulkResponseItem, T> callback) =>
287297
Assign(callback, (a, v) => a.DroppedDocumentCallback = v);
298+
299+
/// <inheritdoc cref="IBulkAllRequest{T}.BulkResponseCallback" />
300+
public BulkAllDescriptor<T> BulkResponseCallback(Action<IBulkResponse> callback) =>
301+
Assign(callback, (a, v) => a.BulkResponseCallback = v);
288302
}
289303
}

src/Tests/Tests/Document/Multiple/BulkAll/BulkAllExceptionApiTests.cs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Threading;
33
using Elastic.Xunit.XunitPlumbing;
4+
using Elasticsearch.Net;
45
using FluentAssertions;
56
using Nest;
67
using Tests.Core.ManagedElasticsearch.Clusters;
@@ -49,4 +50,64 @@ [I] public void WaitBulkAllThrowsAndIsCaught()
4950
}
5051
}
5152
}
53+
54+
55+
public class BulkAllBadRetriesApiTests : BulkAllApiTestsBase
56+
{
57+
public BulkAllBadRetriesApiTests(IntrusiveOperationCluster cluster, EndpointUsage usage) : base(cluster, usage) { }
58+
59+
[U] public void Completes()
60+
{
61+
var client = Tests.Framework.Cluster.Nodes(2)
62+
.ClientCalls(c => c.FailAlways())
63+
.StaticConnectionPool()
64+
.AllDefaults()
65+
.Client;
66+
67+
var index = CreateIndexName();
68+
69+
var size = 1000;
70+
var pages = 10;
71+
var seenPages = 0;
72+
var numberOfDocuments = size * pages;
73+
var documents = CreateLazyStreamOfDocuments(numberOfDocuments);
74+
var requests = 0;
75+
76+
Exception ex = null;
77+
var tokenSource = new CancellationTokenSource();
78+
var observableBulk = client.BulkAll(documents, f => f
79+
.MaxDegreeOfParallelism(1)
80+
.BulkResponseCallback(r => Interlocked.Increment(ref requests))
81+
.BackOffTime(TimeSpan.FromMilliseconds(1))
82+
.BackOffRetries(2)
83+
.Size(size)
84+
.RefreshOnCompleted()
85+
.Index(index)
86+
.BufferToBulk((r, buffer) => r.IndexMany(buffer))
87+
, tokenSource.Token);
88+
89+
try
90+
{
91+
observableBulk.Wait(TimeSpan.FromSeconds(30), b =>
92+
{
93+
Interlocked.Increment(ref seenPages);
94+
});
95+
}
96+
catch (Exception e)
97+
{
98+
ex = e;
99+
}
100+
ex.Should().NotBeNull();
101+
102+
var clientException = ex.Should().BeOfType<ElasticsearchClientException>().Subject;
103+
104+
clientException.Message.Should()
105+
.StartWith("BulkAll halted after")
106+
.And.EndWith("from _bulk and exhausting retries (2)");
107+
108+
requests.Should().Be(3);
109+
// OnNext only called for successful batches.
110+
seenPages.Should().Be(0);
111+
}
112+
}
52113
}

src/Tests/Tests/Framework/VirtualClustering/VirtualizedCluster.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, ConnectionS
2929
}
3030

3131
public IConnectionPool ConnectionPool => Client.ConnectionSettings.ConnectionPool;
32-
private ElasticClient Client => _fixedRequestPipeline?.Client;
32+
public ElasticClient Client => _fixedRequestPipeline?.Client;
3333

3434
public VirtualizedCluster ClientProxiesTo(
3535
Func<IElasticClient, Func<RequestConfigurationDescriptor, IRequestConfiguration>, IResponse> sync,

0 commit comments

Comments
 (0)