|
| 1 | +// Licensed to Elasticsearch B.V under one or more agreements. |
| 2 | +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. |
| 3 | +// See the LICENSE file in the project root for more information |
| 4 | + |
| 5 | +using System; |
| 6 | +using System.Linq; |
| 7 | +using System.Runtime.ExceptionServices; |
| 8 | +using System.Threading; |
| 9 | +using Elastic.Elasticsearch.Xunit.XunitPlumbing; |
| 10 | +using Nest; |
| 11 | +using Tests.Core.ManagedElasticsearch.Clusters; |
| 12 | +using Tests.Domain; |
| 13 | +using Tests.Framework.DocumentationTests; |
| 14 | + |
| 15 | +namespace Tests.Search |
| 16 | +{ |
| 17 | + /**=== Scrolling documents |
| 18 | + * |
| 19 | + * The scroll API can be used to return a large collection of documents from Elasticsearch. |
| 20 | + * |
| 21 | + * NEST exposes the scroll API and an observable scroll implementation that can be used |
| 22 | + * to write concurrent scroll requests. |
| 23 | + */ |
| 24 | + public class ScrollDocuments : IntegrationDocumentationTestBase, IClusterFixture<ReadOnlyCluster> |
| 25 | + { |
| 26 | + public ScrollDocuments(ReadOnlyCluster cluster) : base(cluster) { } |
| 27 | + |
| 28 | + // hide |
| 29 | + private void ProcessResponse(ISearchResponse<Project> response) { } |
| 30 | + |
| 31 | + /**==== Simple use |
| 32 | + * |
| 33 | + * The simplest use of the scroll API is to perform a search request with a |
| 34 | + * scroll timeout, then pass the scroll id returned in each response to |
| 35 | + * the next request to the scroll API, until no more documents are returned |
| 36 | + */ |
| 37 | + [I] |
| 38 | + public void SimpleUse() |
| 39 | + { |
| 40 | + var searchResponse = Client.Search<Project>(s => s |
| 41 | + .Query(q => q |
| 42 | + .Term(f => f.State, StateOfBeing.Stable) |
| 43 | + ) |
| 44 | + .Scroll("10s") // <1> Specify a scroll time for how long Elasticsearch should keep this scroll open on the server side. The time specified should be sufficient to process the response on the client side. |
| 45 | + ); |
| 46 | + |
| 47 | + while (searchResponse.Documents.Any()) // <2> make subsequent requests to the scroll API to keep fetching documents, whilst documents are returned |
| 48 | + { |
| 49 | + ProcessResponse(searchResponse); // <3> do something with the response |
| 50 | + searchResponse = Client.Scroll<Project>("10s", searchResponse.ScrollId); |
| 51 | + } |
| 52 | + } |
| 53 | + |
| 54 | + /**[[scrollall-observable]] |
| 55 | + * ==== ScrollAllObservable |
| 56 | + * |
| 57 | + * Similar to <<bulkall-observable, `BulkAllObservable`>> for bulk indexing a large number of documents, |
| 58 | + * NEST exposes an observable scroll implementation, `ScrollAllObservable`, that can be used |
| 59 | + * to write concurrent scroll requests. `ScrollAllObservable` uses sliced scrolls to split the scroll into |
| 60 | + * multiple slices that can be consumed concurrently. |
| 61 | + * |
| 62 | + * The simplest use of `ScrollAllObservable` is |
| 63 | + */ |
| 64 | + [I] |
| 65 | + public void SimpleScrollAllObservable() |
| 66 | + { |
| 67 | + int numberOfSlices = Environment.ProcessorCount; // <1> See https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html#sliced-scroll[sliced scroll] documentation for choosing an appropriate number of slices. |
| 68 | + |
| 69 | + var scrollAllObservable = Client.ScrollAll<Project>("10s", numberOfSlices, sc => sc |
| 70 | + .MaxDegreeOfParallelism(numberOfSlices) // <2> Number of concurrent sliced scroll requests. Usually want to set this to the same value as the number of slices |
| 71 | + .Search(s => s |
| 72 | + .Query(q => q |
| 73 | + .Term(f => f.State, StateOfBeing.Stable) |
| 74 | + ) |
| 75 | + ) |
| 76 | + ); |
| 77 | + |
| 78 | + scrollAllObservable.Wait(TimeSpan.FromMinutes(10), response => // <3> Total overall time for scrolling **all** documents. Ensure this is a sufficient value to scroll all documents |
| 79 | + { |
| 80 | + ProcessResponse(response.SearchResponse); // <4> do something with the response |
| 81 | + }); |
| 82 | + } |
| 83 | + |
| 84 | + /** |
| 85 | + * More control over how the observable is consumed can be achieved by writing |
| 86 | + * your own observer and subscribing to the observable, which will initiate scrolling |
| 87 | + */ |
| 88 | + [I] |
| 89 | + public void ComplexScrollAllObservable() |
| 90 | + { |
| 91 | + int numberOfSlices = Environment.ProcessorCount; |
| 92 | + |
| 93 | + var scrollAllObservable = Client.ScrollAll<Project>("10s", numberOfSlices, sc => sc |
| 94 | + .MaxDegreeOfParallelism(numberOfSlices) |
| 95 | + .Search(s => s |
| 96 | + .Query(q => q |
| 97 | + .Term(f => f.State, StateOfBeing.Stable) |
| 98 | + ) |
| 99 | + ) |
| 100 | + ); |
| 101 | + |
| 102 | + var waitHandle = new ManualResetEvent(false); |
| 103 | + ExceptionDispatchInfo info = null; |
| 104 | + |
| 105 | + var scrollAllObserver = new ScrollAllObserver<Project>( |
| 106 | + onNext: response => ProcessResponse(response.SearchResponse), // <1> do something with the response |
| 107 | + onError: e => |
| 108 | + { |
| 109 | + info = ExceptionDispatchInfo.Capture(e); // <2> if an exception is thrown, capture it to throw outside of the observer |
| 110 | + waitHandle.Set(); |
| 111 | + }, |
| 112 | + onCompleted: () => waitHandle.Set() |
| 113 | + ); |
| 114 | + |
| 115 | + scrollAllObservable.Subscribe(scrollAllObserver); // <3> initiate scrolling |
| 116 | + |
| 117 | + waitHandle.WaitOne(); // <4> block the current thread until the wait handle is set |
| 118 | + info?.Throw(); // <5> if an exception was captured whilst scrolling, throw it |
| 119 | + } |
| 120 | + } |
| 121 | +} |
0 commit comments