Skip to content

Commit e204fd3

Browse files
committed
Correct spelling and inherit documentation for BulkAll
1 parent fb05345 commit e204fd3

File tree

2 files changed

+53
-54
lines changed

2 files changed

+53
-54
lines changed

src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace Nest
99
{
1010
public class BulkAllObservable<T> : IDisposable, IObservable<IBulkAllResponse> where T : class
1111
{
12-
private readonly IBulkAllRequest<T> _partionedBulkRequest;
12+
private readonly IBulkAllRequest<T> _partitionedBulkRequest;
1313
private readonly IElasticClient _client;
1414
private readonly TimeSpan _backOffTime;
1515
private readonly int _backOffRetries;
@@ -21,22 +21,22 @@ public class BulkAllObservable<T> : IDisposable, IObservable<IBulkAllResponse> w
2121
private readonly CancellationToken _compositeCancelToken;
2222
private readonly CancellationTokenSource _compositeCancelTokenSource;
2323
private readonly Func<IBulkResponseItem, T, bool> _retryPredicate;
24-
private Action<IBulkResponseItem, T> _droppedDocumentCallBack;
24+
private readonly Action<IBulkResponseItem, T> _droppedDocumentCallBack;
2525

2626
public BulkAllObservable(
2727
IElasticClient client,
28-
IBulkAllRequest<T> partionedBulkRequest,
28+
IBulkAllRequest<T> partitionedBulkRequest,
2929
CancellationToken cancellationToken = default(CancellationToken)
3030
)
3131
{
3232
this._client = client;
33-
this._partionedBulkRequest = partionedBulkRequest;
34-
this._backOffRetries = this._partionedBulkRequest.BackOffRetries.GetValueOrDefault(CoordinatedRequestDefaults.BulkAllBackOffRetriesDefault);
35-
this._backOffTime = (this._partionedBulkRequest?.BackOffTime?.ToTimeSpan() ?? CoordinatedRequestDefaults.BulkAllBackOffTimeDefault);
36-
this._bulkSize = this._partionedBulkRequest.Size ?? CoordinatedRequestDefaults.BulkAllSizeDefault;
37-
this._retryPredicate = this._partionedBulkRequest.RetryDocumentPredicate ?? RetryBulkActionPredicate;
38-
this._droppedDocumentCallBack = this._partionedBulkRequest.DroppedDocumentCallback ?? DroppedDocumentCallbackDefault;
39-
this._maxDegreeOfParallelism = _partionedBulkRequest.MaxDegreeOfParallelism ?? CoordinatedRequestDefaults.BulkAllMaxDegreeOfParallelismDefault;
33+
this._partitionedBulkRequest = partitionedBulkRequest;
34+
this._backOffRetries = this._partitionedBulkRequest.BackOffRetries.GetValueOrDefault(CoordinatedRequestDefaults.BulkAllBackOffRetriesDefault);
35+
this._backOffTime = (this._partitionedBulkRequest?.BackOffTime?.ToTimeSpan() ?? CoordinatedRequestDefaults.BulkAllBackOffTimeDefault);
36+
this._bulkSize = this._partitionedBulkRequest.Size ?? CoordinatedRequestDefaults.BulkAllSizeDefault;
37+
this._retryPredicate = this._partitionedBulkRequest.RetryDocumentPredicate ?? RetryBulkActionPredicate;
38+
this._droppedDocumentCallBack = this._partitionedBulkRequest.DroppedDocumentCallback ?? DroppedDocumentCallbackDefault;
39+
this._maxDegreeOfParallelism = _partitionedBulkRequest.MaxDegreeOfParallelism ?? CoordinatedRequestDefaults.BulkAllMaxDegreeOfParallelismDefault;
4040
this._compositeCancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
4141
this._compositeCancelToken = this._compositeCancelTokenSource.Token;
4242
}
@@ -57,10 +57,10 @@ public IDisposable Subscribe(IObserver<IBulkAllResponse> observer)
5757

5858
private void BulkAll(IObserver<IBulkAllResponse> observer)
5959
{
60-
var documents = this._partionedBulkRequest.Documents;
61-
var partioned = new PartitionHelper<T>(documents, this._bulkSize);
60+
var documents = this._partitionedBulkRequest.Documents;
61+
var partitioned = new PartitionHelper<T>(documents, this._bulkSize);
6262
#pragma warning disable 4014
63-
partioned.ForEachAsync(
63+
partitioned.ForEachAsync(
6464
#pragma warning restore 4014
6565
(buffer, page) => this.BulkAsync(buffer, page, 0),
6666
(buffer, response) => observer.OnNext(response),
@@ -89,8 +89,8 @@ private void OnCompleted(Exception exception, IObserver<IBulkAllResponse> observ
8989

9090
private void RefreshOnCompleted()
9191
{
92-
if (!this._partionedBulkRequest.RefreshOnCompleted ) return;
93-
var indices = this._partionedBulkRequest.RefreshIndices ?? this._partionedBulkRequest.Index;
92+
if (!this._partitionedBulkRequest.RefreshOnCompleted) return;
93+
var indices = this._partitionedBulkRequest.RefreshIndices ?? this._partitionedBulkRequest.Index;
9494
if (indices == null) return;
9595
var refresh = this._client.Refresh(indices);
9696
if (!refresh.IsValid) throw Throw($"Refreshing after all documents have indexed failed", refresh.ApiCall);
@@ -100,7 +100,7 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
100100
{
101101
this._compositeCancelToken.ThrowIfCancellationRequested();
102102

103-
var r = this._partionedBulkRequest;
103+
var r = this._partitionedBulkRequest;
104104
var response = await this._client.BulkAsync(s =>
105105
{
106106
s.Index(r.Index).Type(r.Type);
@@ -134,7 +134,7 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
134134
else if (retryDocuments.Count > 0)
135135
throw this.ThrowOnBadBulk(response, $"Bulk indexing failed and after retrying {backOffRetries} times");
136136

137-
this._partionedBulkRequest.BackPressure?.Release();
137+
this._partitionedBulkRequest.BackPressure?.Release();
138138
return new BulkAllResponse { Retries = backOffRetries, Page = page };
139139
}
140140

@@ -145,7 +145,7 @@ private void HandleDroppedDocuments(List<Tuple<IBulkResponseItem, T>> documentsW
145145
.ToList();
146146
if (droppedDocuments.Count <= 0) return;
147147
foreach (var dropped in droppedDocuments) this._droppedDocumentCallBack(dropped.Item1, dropped.Item2);
148-
if (!this._partionedBulkRequest.ContinueAfterDroppedDocuments)
148+
if (!this._partitionedBulkRequest.ContinueAfterDroppedDocuments)
149149
throw this.ThrowOnBadBulk(response, $"BulkAll halted after receiving failures that can not be retried from _bulk");
150150
}
151151

@@ -183,7 +183,7 @@ private async Task<IBulkAllResponse> RetryDocuments(long page, int backOffRetrie
183183
private Exception ThrowOnBadBulk(IElasticsearchResponse response, string message)
184184
{
185185
this._incrementFailed();
186-
this._partionedBulkRequest.BackPressure?.Release();
186+
this._partitionedBulkRequest.BackPressure?.Release();
187187
return Throw(message, response.ApiCall);
188188
}
189189
private static ElasticsearchClientException Throw(string message, IApiCallDetails details) =>

src/Nest/Document/Multiple/BulkAll/BulkAllRequest.cs

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Threading;
45
using Elasticsearch.Net;
56

67
namespace Nest
78
{
89
public interface IBulkAllRequest<T> where T : class
910
{
10-
/// <summary> In case of a 429 (too busy) how long should we wait before retrying</summary>
11+
/// <summary> In case of a HTTP 429 (Too Many Requests) response status code, how long should we wait before retrying</summary>
1112
Time BackOffTime { get; set; }
1213

13-
/// <summary> In case of a 429 (too busy) how many times should we automatically back off before failing</summary>
14+
/// <summary> In case of a HTTP 429 (Too Many Requests) response status code, how many times should we automatically back off before failing</summary>
1415
int? BackOffRetries { get; set; }
1516

1617
/// <summary> The number of documents to send per bulk</summary>
1718
int? Size { get; set; }
1819

1920
///<summary>
20-
/// The documents to send to elasticsearch, try to keep the IEnumerable lazy.
21-
/// The bulk observable will ToList() each partitioned page in a lazy fashion, to keep memory consumption to a minimum.
22-
/// It makes no sense to set this to an list of 1 million records because all of those million records need to be in memory
23-
/// Make use of c#'s lazy generator!
21+
/// The documents to send to Elasticsearch, ideally lazily evaluated by using <see langword="yield"/>
22+
/// return to provide each document.
23+
/// <see cref="BulkAllObservable{T}"/> will eager evaluate each partitioned page when operating on it, using <see cref="Enumerable.ToList{T}"/>.
2424
///</summary>
2525
IEnumerable<T> Documents { get; }
2626

@@ -35,12 +35,12 @@ public interface IBulkAllRequest<T> where T : class
3535

3636
///<summary>
3737
///Sets the number of shard copies that must be active before proceeding with the bulk operation.
38-
///Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any
38+
///Defaults to <c>1</c>, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any
3939
///non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)
4040
///</summary>
4141
int? WaitForActiveShards { get; set; }
4242

43-
///<summary>Refresh the index after performing each operation (elasticsearch will refresh locally)</summary>
43+
///<summary>Refresh the index after performing each operation (Elasticsearch will refresh locally)</summary>
4444
[Obsolete("This option is scheduled for deletion in 7.0, refreshing on each _bulk makes little sense for BulkAll")]
4545
Refresh? Refresh { get; set; }
4646

@@ -62,8 +62,8 @@ public interface IBulkAllRequest<T> where T : class
6262
string Pipeline { get; set; }
6363

6464
/// <summary>
65-
/// By default the bulkall helper simply calls BulkDescriptor.IndexMany on the buffer.
66-
/// There might be case where you'd like more control over this. By setting this callback you are in complete control
65+
/// By default, <see cref="BulkAllObservable{T}"/> calls <see cref="BulkDescriptor.IndexMany{T}"/> on the buffer.
66+
/// There might be case where you'd like more control over the bulk operation. By setting this callback, you are in complete control
6767
/// of describing how the buffer should be translated to a bulk operation.
6868
/// </summary>
6969
Action<BulkDescriptor, IList<T>> BufferToBulk { get; set; }
@@ -76,7 +76,8 @@ public interface IBulkAllRequest<T> where T : class
7676
ProducerConsumerBackPressure BackPressure { get; set; }
7777

7878
/// <summary>
79-
/// A predicate which controls which documents should be retried, defaults to failed bulk items with status code 429
79+
/// A predicate to control which documents should be retried.
80+
/// Defaults to failed bulk items with a HTTP 429 (Too Many Requests) response status code.
8081
/// </summary>
8182
Func<IBulkResponseItem, T, bool> RetryDocumentPredicate { get; set; }
8283

@@ -88,8 +89,8 @@ public interface IBulkAllRequest<T> where T : class
8889

8990
/// <summary>
9091
/// If a bulk operation fails because it receives documents it can not retry they will be fed to this callback.
91-
/// If <see cref="ContinueAfterDroppedDocuments"/> is set to true processing will continue, so this callback can be used
92-
/// to feed into a dead letter queue. Otherwise the bulk all indexation will be halted.
92+
/// If <see cref="ContinueAfterDroppedDocuments"/> is set to <c>true</c> processing will continue, so this callback can be used
93+
/// to feed into a dead letter queue. Otherwise bulk all indexing will be halted.
9394
/// </summary>
9495
Action<IBulkResponseItem, T> DroppedDocumentCallback { get; set; }
9596

@@ -177,54 +178,56 @@ public BulkAllDescriptor(IEnumerable<T> documents)
177178
((IBulkAllRequest<T>)this).Type = typeof(T);
178179
}
179180

180-
/// <inheritdoc />
181-
public BulkAllDescriptor<T> MaxDegreeOfParallelism(int? parallism) =>
182-
Assign(a => a.MaxDegreeOfParallelism = parallism);
181+
/// <inheritdoc cref="IBulkAllRequest{T}.MaxDegreeOfParallelism" />
182+
public BulkAllDescriptor<T> MaxDegreeOfParallelism(int? parallelism) =>
183+
Assign(a => a.MaxDegreeOfParallelism = parallelism);
183184

184-
/// <inheritdoc />
185+
/// <inheritdoc cref="IBulkAllRequest{T}.Size" />
185186
public BulkAllDescriptor<T> Size(int? size) => Assign(a => a.Size = size);
186187

187-
/// <inheritdoc />
188+
/// <inheritdoc cref="IBulkAllRequest{T}.BackOffRetries" />
188189
public BulkAllDescriptor<T> BackOffRetries(int? backoffs) =>
189190
Assign(a => a.BackOffRetries = backoffs);
190191

191-
/// <inheritdoc />
192+
/// <inheritdoc cref="IBulkAllRequest{T}.BackOffTime" />
192193
public BulkAllDescriptor<T> BackOffTime(Time time) => Assign(a => a.BackOffTime = time);
193194

194-
/// <inheritdoc />
195+
/// <inheritdoc cref="IBulkAllRequest{T}.Index" />
195196
public BulkAllDescriptor<T> Index(IndexName index) => Assign(a => a.Index = index);
196197

197-
/// <inheritdoc />
198+
/// <inheritdoc cref="IBulkAllRequest{T}.Index" />
198199
public BulkAllDescriptor<T> Index<TOther>() where TOther : class => Assign(a => a.Index = typeof(TOther));
199200

200-
/// <inheritdoc />
201+
/// <inheritdoc cref="IBulkAllRequest{T}.Type" />
201202
public BulkAllDescriptor<T> Type(TypeName type) => Assign(a => a.Type = type);
202203

203-
/// <inheritdoc />
204+
/// <inheritdoc cref="IBulkAllRequest{T}.Type" />
204205
public BulkAllDescriptor<T> Type<TOther>() where TOther : class => Assign(a => a.Type = typeof(TOther));
205206

206-
/// <inheritdoc />
207+
/// <inheritdoc cref="IBulkAllRequest{T}.RefreshOnCompleted" />
207208
public BulkAllDescriptor<T> RefreshOnCompleted(bool refresh = true) => Assign(p => p.RefreshOnCompleted = refresh);
208209

209-
/// <inheritdoc />
210+
/// <inheritdoc cref="IBulkAllRequest{T}.Refresh" />
211+
#pragma warning disable 618
210212
public BulkAllDescriptor<T> Refresh(Refresh? refresh) => Assign(p => p.Refresh = refresh);
213+
#pragma warning restore 618
211214

212215
/// <inheritdoc cref="IBulkAllRequest{T}.RefreshIndices"/>
213216
public BulkAllDescriptor<T> RefreshIndices(Indices indicesToRefresh) => Assign(a => a.RefreshIndices = indicesToRefresh);
214217

215-
/// <inheritdoc />
218+
/// <inheritdoc cref="IBulkAllRequest{T}.Routing"/>
216219
public BulkAllDescriptor<T> Routing(Routing routing) => Assign(p => p.Routing = routing);
217220

218-
/// <inheritdoc />
221+
/// <inheritdoc cref="IBulkAllRequest{T}.Timeout"/>
219222
public BulkAllDescriptor<T> Timeout(Time timeout) => Assign(p => p.Timeout = timeout);
220223

221-
/// <inheritdoc />
224+
/// <inheritdoc cref="IBulkAllRequest{T}.Pipeline"/>
222225
public BulkAllDescriptor<T> Pipeline(string pipeline) => Assign(p => p.Pipeline = pipeline);
223226

224-
/// <inheritdoc />
227+
/// <inheritdoc cref="IBulkAllRequest{T}.BufferToBulk"/>
225228
public BulkAllDescriptor<T> BufferToBulk(Action<BulkDescriptor, IList<T>> modifier) => Assign(p => p.BufferToBulk = modifier);
226229

227-
/// <inheritdoc />
230+
/// <inheritdoc cref="IBulkAllRequest{T}.RetryDocumentPredicate"/>
228231
public BulkAllDescriptor<T> RetryDocumentPredicate(Func<IBulkResponseItem, T, bool> predicate) =>
229232
Assign(p => p.RetryDocumentPredicate = predicate);
230233

@@ -243,11 +246,7 @@ public BulkAllDescriptor<T> BackPressure(int maxConcurrency, int? backPressureFa
243246
/// <inheritdoc cref="IBulkAllRequest{T}.ContinueAfterDroppedDocuments" />
244247
public BulkAllDescriptor<T> ContinueAfterDroppedDocuments(bool proceed = true) => Assign(p => p.ContinueAfterDroppedDocuments = proceed);
245248

246-
/// <summary>
247-
/// If <see cref="ContinueAfterDroppedDocuments"/> is set to false (not the default) dropped messages will be fed through
248-
/// this callback. Use this if you don't expect many failures and want to feed these dropped messages in a dead letter queue
249-
/// for instance.
250-
/// </summary>
249+
/// <inheritdoc cref="IBulkAllRequest{T}.DroppedDocumentCallback"/>
251250
public BulkAllDescriptor<T> DroppedDocumentCallback(Action<IBulkResponseItem, T> callback) =>
252251
Assign(p => p.DroppedDocumentCallback = callback);
253252

0 commit comments

Comments
 (0)