1- using System ;
2- using Elasticsearch . Net ;
3-
4- namespace Nest
5- {
6- public class ReindexObservable < T > : IDisposable , IObservable < IReindexResponse < T > > where T : class
7- {
8- private ReindexDescriptor < T > _reindexDescriptor ;
9- private readonly IConnectionSettingsValues _connectionSettings ;
10- internal IElasticClient CurrentClient { get ; set ; }
11- internal ReindexDescriptor < T > ReindexDescriptor { get ; set ; }
12-
13- public ReindexObservable ( IElasticClient client , IConnectionSettingsValues connectionSettings , ReindexDescriptor < T > reindexDescriptor )
14- {
15- this . _connectionSettings = connectionSettings ;
16- this . _reindexDescriptor = reindexDescriptor ;
17- this . CurrentClient = client ;
18- }
19-
20- public IDisposable Subscribe ( IObserver < IReindexResponse < T > > observer )
21- {
22- observer . ThrowIfNull ( "observer" ) ;
23- try
24- {
25- this . Reindex ( observer ) ;
26- }
27- catch ( Exception e )
28- {
29- observer . OnError ( e ) ;
30- }
31- return this ;
32-
33- }
34-
35- private void Reindex ( IObserver < IReindexResponse < T > > observer )
36- {
37- var fromIndex = this . _reindexDescriptor . _FromIndexName ;
38- var toIndex = this . _reindexDescriptor . _ToIndexName ;
1+ using System ;
2+ using Elasticsearch . Net ;
3+
4+ namespace Nest
5+ {
6+ public class ReindexObservable < T > : IDisposable , IObservable < IReindexResponse < T > > where T : class
7+ {
8+ private ReindexDescriptor < T > _reindexDescriptor ;
9+ private readonly IConnectionSettingsValues _connectionSettings ;
10+ internal IElasticClient CurrentClient { get ; set ; }
11+ internal ReindexDescriptor < T > ReindexDescriptor { get ; set ; }
12+
13+ public ReindexObservable ( IElasticClient client , IConnectionSettingsValues connectionSettings , ReindexDescriptor < T > reindexDescriptor )
14+ {
15+ this . _connectionSettings = connectionSettings ;
16+ this . _reindexDescriptor = reindexDescriptor ;
17+ this . CurrentClient = client ;
18+ }
19+
20+ public IDisposable Subscribe ( IObserver < IReindexResponse < T > > observer )
21+ {
22+ observer . ThrowIfNull ( "observer" ) ;
23+ try
24+ {
25+ this . Reindex ( observer ) ;
26+ }
27+ catch ( Exception e )
28+ {
29+ observer . OnError ( e ) ;
30+ }
31+ return this ;
32+
33+ }
34+
35+ private void Reindex ( IObserver < IReindexResponse < T > > observer )
36+ {
37+ var fromIndex = this . _reindexDescriptor . _FromIndexName ;
38+ var toIndex = this . _reindexDescriptor . _ToIndexName ;
3939 var scroll = this . _reindexDescriptor . _Scroll ?? "2m" ;
40- var size = this . _reindexDescriptor . _Size ?? 100 ;
41-
42- fromIndex . ThrowIfNullOrEmpty ( "fromIndex" ) ;
43- toIndex . ThrowIfNullOrEmpty ( "toIndex" ) ;
44-
45- var indexSettings = this . CurrentClient . GetIndexSettings ( i=> i . Index ( this . _reindexDescriptor . _FromIndexName ) ) ;
46- Func < CreateIndexDescriptor , CreateIndexDescriptor > settings =
47- this . _reindexDescriptor . _CreateIndexSelector ?? ( ( ci ) => ci ) ;
48-
49- var createIndexResponse = this . CurrentClient . CreateIndex (
50- toIndex , ( c ) => settings ( c . InitializeUsing ( indexSettings . IndexSettings ) ) ) ;
51- if ( ! createIndexResponse . IsValid )
52- throw new ReindexException ( createIndexResponse . ConnectionStatus ) ;
53-
40+ var size = this . _reindexDescriptor . _Size ?? 100 ;
41+
42+ fromIndex . ThrowIfNullOrEmpty ( "fromIndex" ) ;
43+ toIndex . ThrowIfNullOrEmpty ( "toIndex" ) ;
44+
45+ var indexSettings = this . CurrentClient . GetIndexSettings ( i => i . Index ( this . _reindexDescriptor . _FromIndexName ) ) ;
46+ Func < CreateIndexDescriptor , CreateIndexDescriptor > settings =
47+ this . _reindexDescriptor . _CreateIndexSelector ?? ( ( ci ) => ci ) ;
48+
49+ var createIndexResponse = this . CurrentClient . CreateIndex (
50+ toIndex , ( c ) => settings ( c . InitializeUsing ( indexSettings . IndexSettings ) ) ) ;
51+ if ( ! createIndexResponse . IsValid )
52+ throw new ReindexException ( createIndexResponse . ConnectionStatus ) ;
53+
5454 var page = 0 ;
55- Func < SearchDescriptor < T > , SearchDescriptor < T > > searchDescriptor = s => s . Index ( fromIndex ) ;
56-
57- if ( typeof ( T ) . Name . Equals ( typeof ( object ) . Name ) )
58- {
59- searchDescriptor = s => searchDescriptor ( s ) . AllTypes ( ) ;
60- }
55+ Func < SearchDescriptor < T > , SearchDescriptor < T > > searchDescriptor = s => s . Index ( fromIndex ) ;
56+
57+ if ( typeof ( T ) . Name . Equals ( typeof ( object ) . Name ) )
58+ {
59+ searchDescriptor = s => searchDescriptor ( s ) . AllTypes ( ) ;
60+ }
6161 else
6262 {
63- searchDescriptor = s => searchDescriptor ( s ) . Type < T > ( ) ;
64- }
65-
66-
63+ searchDescriptor = s => searchDescriptor ( s ) . Type < T > ( ) ;
64+ }
65+
66+
6767 var searchResult = this . CurrentClient . Search < T > (
68- s => searchDescriptor ( s )
69- . From ( 0 )
70- . Size ( size )
71- . Query ( this . _reindexDescriptor . _QuerySelector ?? ( q=> q . MatchAll ( ) ) )
72- . SearchType ( SearchType . Scan )
73- . Scroll ( scroll ) ) ;
74-
75- if ( searchResult . Total <= 0 )
76- throw new ReindexException ( searchResult . ConnectionStatus , "index " + fromIndex + " has no documents!") ;
77- IBulkResponse indexResult = null ;
78- do
79- {
80- var result = searchResult ;
81- searchResult = this . CurrentClient . Scroll < T > ( s => s
82- . Scroll ( scroll )
83- . ScrollId ( result . ScrollId )
84- ) ;
85- if ( searchResult . Documents . HasAny ( ) )
86- indexResult = this . IndexSearchResults ( searchResult , observer , toIndex , page ) ;
87- page ++ ;
88- } while ( searchResult . IsValid && indexResult != null && indexResult . IsValid && searchResult . Documents . HasAny ( ) ) ;
89-
90-
91- observer . OnCompleted ( ) ;
92- }
93-
94- public IBulkResponse IndexSearchResults ( ISearchResponse < T > searchResult , IObserver < IReindexResponse < T > > observer , string toIndex , int page )
95- {
96- if ( ! searchResult . IsValid )
97- throw new ReindexException ( searchResult . ConnectionStatus , "reindex failed on scroll #" + page ) ;
98-
99- var bb = new BulkDescriptor ( ) ;
100- foreach ( var d in searchResult . Hits )
101- {
102- IHit < T > d1 = d ;
103- bb . Index < T > ( bi => bi . Document ( d1 . Source ) . Type ( d1 . Type ) . Index ( toIndex ) . Id ( d . Id ) ) ;
104- }
105-
106- var indexResult = this . CurrentClient . Bulk ( b=> bb ) ;
107- if ( ! indexResult . IsValid )
108- throw new ReindexException ( indexResult . ConnectionStatus , "reindex failed when indexing page " + page ) ;
109-
110- observer . OnNext ( new ReindexResponse < T > ( )
111- {
112- BulkResponse = indexResult ,
113- SearchResponse = searchResult ,
114- Scroll = page
115- } ) ;
116- return indexResult ;
117- }
118-
119-
120- public void Dispose ( )
121- {
122-
123- }
124- }
68+ s => searchDescriptor ( s )
69+ . From ( 0 )
70+ . Size ( size )
71+ . Query ( this . _reindexDescriptor . _QuerySelector ?? ( q => q . MatchAll ( ) ) )
72+ . SearchType ( SearchType . Scan )
73+ . Scroll ( scroll ) ) ;
74+
75+ if ( searchResult . Total <= 0 )
76+ throw new ReindexException ( searchResult . ConnectionStatus , string . Format ( "index {0} has no documents!" , fromIndex ) ) ;
77+ IBulkResponse indexResult = null ;
78+ do
79+ {
80+ var result = searchResult ;
81+ searchResult = this . CurrentClient . Scroll < T > ( s => s
82+ . Scroll ( scroll )
83+ . ScrollId ( result . ScrollId )
84+ ) ;
85+ if ( searchResult . Documents . HasAny ( ) )
86+ indexResult = this . IndexSearchResults ( searchResult , observer , toIndex , page ) ;
87+ page ++ ;
88+ } while ( searchResult . IsValid && indexResult != null && indexResult . IsValid && searchResult . Documents . HasAny ( ) ) ;
89+
90+
91+ observer . OnCompleted ( ) ;
92+ }
93+
94+ public IBulkResponse IndexSearchResults ( ISearchResponse < T > searchResult , IObserver < IReindexResponse < T > > observer , string toIndex , int page )
95+ {
96+ if ( ! searchResult . IsValid )
97+ throw new ReindexException ( searchResult . ConnectionStatus , "reindex failed on scroll #" + page ) ;
98+
99+ var bb = new BulkDescriptor ( ) ;
100+ foreach ( var d in searchResult . Hits )
101+ {
102+ IHit < T > d1 = d ;
103+ bb . Index < T > ( bi => bi . Document ( d1 . Source ) . Type ( d1 . Type ) . Index ( toIndex ) . Id ( d . Id ) ) ;
104+ }
105+
106+ var indexResult = this . CurrentClient . Bulk ( b => bb ) ;
107+ if ( ! indexResult . IsValid )
108+ throw new ReindexException ( indexResult . ConnectionStatus , "reindex failed when indexing page " + page ) ;
109+
110+ observer . OnNext ( new ReindexResponse < T >
111+ {
112+ BulkResponse = indexResult ,
113+ SearchResponse = searchResult ,
114+ Scroll = page
115+ } ) ;
116+ return indexResult ;
117+ }
118+
119+
120+ public void Dispose ( )
121+ {
122+
123+ }
124+ }
125125}
0 commit comments