Skip to content

Commit 7aee986

Browse files
Async fixes (#6359) (#6360)
Co-authored-by: Steve Gordon <sgordon@hotmail.co.uk>
1 parent 83afe17 commit 7aee986

File tree

4 files changed

+48
-7
lines changed

4 files changed

+48
-7
lines changed

build/scripts/scripts.fsproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
<Content Include="..\..\.github\workflows\make-release-notes.yml"><Link>make-release-notes.yml</Link></Content>
3535
</ItemGroup>
3636
<ItemGroup>
37-
<PackageReference Include="FSharp.Core" Version="6.0.1" />
3837
<PackageReference Include="Bullseye" Version="3.3.0" />
3938
<PackageReference Include="Elastic.Elasticsearch.Managed" Version="0.3.0" />
4039

src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
<PackageReference Include="Elastic.Transport" Version="0.3.2" />
2121
<PackageReference Condition="'$(TargetFramework)' == 'netstandard2.0'" Include="System.Reflection.Emit" Version="4.3.0" />
2222
<PackageReference Condition="'$(TargetFramework)' == 'netstandard2.0'" Include="System.Reflection.Emit.Lightweight" Version="4.3.0" />
23+
<PackageReference Include="Microsoft.VisualStudio.Threading.Analyzers" Version="17.1.46">
24+
<PrivateAssets>all</PrivateAssets>
25+
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
26+
</PackageReference>
2327
</ItemGroup>
2428
<ItemGroup>
2529
<InternalsVisibleTo Include="Tests" Key="$(ExposedPublicKey)" />

src/Elastic.Clients.Elasticsearch/Helpers/BulkAllObservable.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,15 @@ private void BulkAll(IObserver<BulkAllResponse> observer)
5656
var documents = _partitionedBulkRequest.Documents;
5757
var partitioned = new PartitionHelper<T>(documents, _bulkSize);
5858
#pragma warning disable 4014
59+
#pragma warning disable VSTHRD110 // Observe result of async calls
5960
partitioned.ForEachAsync(
6061
#pragma warning restore 4014
6162
(buffer, page) => BulkAsync(buffer, page, 0),
6263
(buffer, response) => observer.OnNext(response),
6364
ex => OnCompleted(ex, observer),
6465
_maxDegreeOfParallelism
6566
);
67+
#pragma warning restore VSTHRD110 // Observe result of async calls
6668
}
6769

6870
private void OnCompleted(Exception exception, IObserver<BulkAllResponse> observer)
@@ -152,7 +154,7 @@ private async Task<BulkAllResponse> BulkAsync(IList<T> buffer, long page, int ba
152154
_bulkResponseCallback?.Invoke(response);
153155

154156
if (!response.ApiCall.Success)
155-
return await HandleBulkRequest(buffer, page, backOffRetries, response).ConfigureAwait(false);
157+
return await HandleBulkRequestAsync(buffer, page, backOffRetries, response).ConfigureAwait(false);
156158

157159
var retryableDocuments = new List<T>();
158160
var droppedDocuments = new List<Tuple<BulkResponseItemBase, T>>();
@@ -171,7 +173,7 @@ private async Task<BulkAllResponse> BulkAsync(IList<T> buffer, long page, int ba
171173
HandleDroppedDocuments(droppedDocuments, response);
172174

173175
if (retryableDocuments.Count > 0 && backOffRetries < _backOffRetries)
174-
return await RetryDocuments(page, ++backOffRetries, retryableDocuments).ConfigureAwait(false);
176+
return await RetryDocumentsAsync(page, ++backOffRetries, retryableDocuments).ConfigureAwait(false);
175177

176178
if (retryableDocuments.Count > 0)
177179
throw ThrowOnBadBulk(response, $"Bulk indexing failed and after retrying {backOffRetries} times");
@@ -193,7 +195,7 @@ private void HandleDroppedDocuments(List<Tuple<BulkResponseItemBase, T>> dropped
193195
throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after receiving failures that can not be retried from _bulk");
194196
}
195197

196-
private async Task<BulkAllResponse> HandleBulkRequest(IList<T> buffer, long page, int backOffRetries, BulkResponse response)
198+
private async Task<BulkAllResponse> HandleBulkRequestAsync(IList<T> buffer, long page, int backOffRetries, BulkResponse response)
197199
{
198200
var clientException = response.ApiCall.OriginalException as TransportException;
199201
var failureReason = clientException?.FailureReason;
@@ -205,7 +207,7 @@ private async Task<BulkAllResponse> HandleBulkRequest(IList<T> buffer, long page
205207
throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after attempted bulk failed over all the active nodes");
206208

207209
ThrowOnExhaustedRetries();
208-
return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false);
210+
return await RetryDocumentsAsync(page, ++backOffRetries, buffer).ConfigureAwait(false);
209211
case PipelineFailure.CouldNotStartSniffOnStartup:
210212
case PipelineFailure.BadAuthentication:
211213
case PipelineFailure.NoNodesAttempted:
@@ -218,7 +220,7 @@ private async Task<BulkAllResponse> HandleBulkRequest(IList<T> buffer, long page
218220
case PipelineFailure.BadRequest:
219221
default:
220222
ThrowOnExhaustedRetries();
221-
return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false);
223+
return await RetryDocumentsAsync(page, ++backOffRetries, buffer).ConfigureAwait(false);
222224
}
223225

224226
void ThrowOnExhaustedRetries()
@@ -231,7 +233,7 @@ void ThrowOnExhaustedRetries()
231233
}
232234
}
233235

234-
private async Task<BulkAllResponse> RetryDocuments(long page, int backOffRetries, IList<T> retryDocuments)
236+
private async Task<BulkAllResponse> RetryDocumentsAsync(long page, int backOffRetries, IList<T> retryDocuments)
235237
{
236238
_incrementRetries();
237239
await Task.Delay(_backOffTime, _compositeCancelToken).ConfigureAwait(false);

src/Elastic.Clients.Elasticsearch/packages.lock.json

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@
3737
"Microsoft.SourceLink.Common": "1.0.0"
3838
}
3939
},
40+
"Microsoft.VisualStudio.Threading.Analyzers": {
41+
"type": "Direct",
42+
"requested": "[17.1.46, )",
43+
"resolved": "17.1.46",
44+
"contentHash": "7pImoMcQaWZYAwu1aDBB8yBkvgad13yjrRHQ65pwHMX757vZ49OrNaEuRSLDu2PjZGonsTkQAJK8JK4W/wW4bw=="
45+
},
4046
"Microsoft.Build.Tasks.Git": {
4147
"type": "Transitive",
4248
"resolved": "1.0.0",
@@ -129,6 +135,12 @@
129135
"Microsoft.SourceLink.Common": "1.0.0"
130136
}
131137
},
138+
"Microsoft.VisualStudio.Threading.Analyzers": {
139+
"type": "Direct",
140+
"requested": "[17.1.46, )",
141+
"resolved": "17.1.46",
142+
"contentHash": "7pImoMcQaWZYAwu1aDBB8yBkvgad13yjrRHQ65pwHMX757vZ49OrNaEuRSLDu2PjZGonsTkQAJK8JK4W/wW4bw=="
143+
},
132144
"Microsoft.Bcl.AsyncInterfaces": {
133145
"type": "Transitive",
134146
"resolved": "6.0.0",
@@ -266,6 +278,12 @@
266278
"Microsoft.SourceLink.Common": "1.0.0"
267279
}
268280
},
281+
"Microsoft.VisualStudio.Threading.Analyzers": {
282+
"type": "Direct",
283+
"requested": "[17.1.46, )",
284+
"resolved": "17.1.46",
285+
"contentHash": "7pImoMcQaWZYAwu1aDBB8yBkvgad13yjrRHQ65pwHMX757vZ49OrNaEuRSLDu2PjZGonsTkQAJK8JK4W/wW4bw=="
286+
},
269287
"NETStandard.Library": {
270288
"type": "Direct",
271289
"requested": "[2.0.3, )",
@@ -509,6 +527,12 @@
509527
"Microsoft.SourceLink.Common": "1.0.0"
510528
}
511529
},
530+
"Microsoft.VisualStudio.Threading.Analyzers": {
531+
"type": "Direct",
532+
"requested": "[17.1.46, )",
533+
"resolved": "17.1.46",
534+
"contentHash": "7pImoMcQaWZYAwu1aDBB8yBkvgad13yjrRHQ65pwHMX757vZ49OrNaEuRSLDu2PjZGonsTkQAJK8JK4W/wW4bw=="
535+
},
512536
"Microsoft.Bcl.AsyncInterfaces": {
513537
"type": "Transitive",
514538
"resolved": "6.0.0",
@@ -631,6 +655,12 @@
631655
"Microsoft.SourceLink.Common": "1.0.0"
632656
}
633657
},
658+
"Microsoft.VisualStudio.Threading.Analyzers": {
659+
"type": "Direct",
660+
"requested": "[17.1.46, )",
661+
"resolved": "17.1.46",
662+
"contentHash": "7pImoMcQaWZYAwu1aDBB8yBkvgad13yjrRHQ65pwHMX757vZ49OrNaEuRSLDu2PjZGonsTkQAJK8JK4W/wW4bw=="
663+
},
634664
"Microsoft.Build.Tasks.Git": {
635665
"type": "Transitive",
636666
"resolved": "1.0.0",
@@ -714,6 +744,12 @@
714744
"Microsoft.SourceLink.Common": "1.0.0"
715745
}
716746
},
747+
"Microsoft.VisualStudio.Threading.Analyzers": {
748+
"type": "Direct",
749+
"requested": "[17.1.46, )",
750+
"resolved": "17.1.46",
751+
"contentHash": "7pImoMcQaWZYAwu1aDBB8yBkvgad13yjrRHQ65pwHMX757vZ49OrNaEuRSLDu2PjZGonsTkQAJK8JK4W/wW4bw=="
752+
},
717753
"Microsoft.Build.Tasks.Git": {
718754
"type": "Transitive",
719755
"resolved": "1.0.0",

0 commit comments

Comments
 (0)