Skip to content

Commit 2c92d16

Browse files
committed
Allow specific pipeline on bulk index and create operations
Closes #2087
1 parent ab136d0 commit 2c92d16

File tree

5 files changed

+86
-27
lines changed

5 files changed

+86
-27
lines changed

src/Nest/Document/Multiple/Bulk/BulkOperation/BulkCreate.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
using System;
2+
using Newtonsoft.Json;
23

34
namespace Nest
45
{
56
public interface IBulkCreateOperation<T> : IBulkOperation
67
where T : class
78
{
89
T Document { get; set; }
10+
11+
[JsonProperty("pipeline")]
12+
string Pipeline { get; set; }
913
}
1014

1115
public class BulkCreateOperation<T> : BulkOperationBase, IBulkCreateOperation<T>
1216
where T : class
1317
{
1418
public T Document { get; set; }
1519

20+
public string Pipeline { get; set; }
21+
1622
public BulkCreateOperation(T document)
1723
{
1824
this.Document = document;
@@ -28,7 +34,7 @@ public BulkCreateOperation(T document)
2834
}
2935

3036

31-
public class BulkCreateDescriptor<T> : BulkOperationDescriptorBase<BulkCreateDescriptor<T>, IBulkCreateOperation<T>>, IBulkCreateOperation<T>
37+
public class BulkCreateDescriptor<T> : BulkOperationDescriptorBase<BulkCreateDescriptor<T>, IBulkCreateOperation<T>>, IBulkCreateOperation<T>
3238
where T : class
3339
{
3440
protected override string BulkOperationType => "create";
@@ -40,9 +46,16 @@ public class BulkCreateDescriptor<T> : BulkOperationDescriptorBase<BulkCreateDes
4046

4147
T IBulkCreateOperation<T>.Document { get; set; }
4248

49+
string IBulkCreateOperation<T>.Pipeline { get; set; }
50+
4351
/// <summary>
4452
/// The object to update, if id is not manually set it will be inferred from the object
4553
/// </summary>
4654
public BulkCreateDescriptor<T> Document(T @object) => Assign(a => a.Document = @object);
55+
56+
/// <summary>
57+
/// The pipeline id to preprocess documents with
58+
/// </summary>
59+
public BulkCreateDescriptor<T> Pipeline(string pipeline) => Assign(a => a.Pipeline = pipeline);
4760
}
48-
}
61+
}

src/Nest/Document/Multiple/Bulk/BulkOperation/BulkIndex.cs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,18 @@
33

44
namespace Nest
55
{
6-
public interface IIndexOperation<T> : IBulkOperation
6+
public interface IBulkIndexOperation<T> : IBulkOperation
77
{
8-
[JsonProperty(PropertyName = "_percolate")]
8+
[JsonProperty("_percolate")]
99
string Percolate { get; set; }
1010

11+
[JsonProperty("pipeline")]
12+
string Pipeline { get; set; }
13+
1114
T Document { get; set; }
1215
}
1316

14-
public class BulkIndexOperation<T> : BulkOperationBase, IIndexOperation<T>
17+
public class BulkIndexOperation<T> : BulkOperationBase, IBulkIndexOperation<T>
1518
where T : class
1619
{
1720
public BulkIndexOperation(T document)
@@ -29,18 +32,20 @@ public BulkIndexOperation(T document)
2932

3033
public string Percolate { get; set; }
3134

35+
public string Pipeline { get; set; }
36+
3237
public T Document { get; set; }
3338
}
3439

3540

36-
public class BulkIndexDescriptor<T> : BulkOperationDescriptorBase<BulkIndexDescriptor<T>, IIndexOperation<T>>, IIndexOperation<T>
41+
public class BulkIndexDescriptor<T> : BulkOperationDescriptorBase<BulkIndexDescriptor<T>, IBulkIndexOperation<T>>, IBulkIndexOperation<T>
3742
where T : class
3843
{
3944
protected override string BulkOperationType => "index";
4045
protected override Type BulkOperationClrType => typeof(T);
41-
42-
string IIndexOperation<T>.Percolate { get; set; }
43-
T IIndexOperation<T>.Document { get; set; }
46+
string IBulkIndexOperation<T>.Percolate { get; set; }
47+
string IBulkIndexOperation<T>.Pipeline { get; set; }
48+
T IBulkIndexOperation<T>.Document { get; set; }
4449

4550
protected override object GetBulkOperationBody() => Self.Document;
4651

@@ -51,5 +56,11 @@ public class BulkIndexDescriptor<T> : BulkOperationDescriptorBase<BulkIndexDescr
5156
/// </summary>
5257
public BulkIndexDescriptor<T> Document(T @object) => Assign(a => a.Document = @object);
5358

59+
/// <summary>
60+
/// The pipeline id to preprocess documents with
61+
/// </summary>
62+
public BulkIndexDescriptor<T> Pipeline(string pipeline) => Assign(a => a.Pipeline = pipeline);
63+
64+
public BulkIndexDescriptor<T> Percolate(string percolate) => Assign(a => a.Percolate = percolate);
5465
}
55-
}
66+
}

src/Nest/Document/Multiple/Bulk/BulkRequest.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@ public BulkDescriptor Create<T>(Func<BulkCreateDescriptor<T>, IBulkCreateOperati
3333
public BulkDescriptor CreateMany<T>(IEnumerable<T> @objects, Func<BulkCreateDescriptor<T>, T, IBulkCreateOperation<T>> bulkCreateSelector = null) where T : class =>
3434
Assign(a => @objects.ForEach(o => AddOperation(bulkCreateSelector.InvokeOrDefault(new BulkCreateDescriptor<T>().Document(o), o))));
3535

36-
public BulkDescriptor Index<T>(Func<BulkIndexDescriptor<T>, IIndexOperation<T>> bulkIndexSelector) where T : class =>
36+
public BulkDescriptor Index<T>(Func<BulkIndexDescriptor<T>, IBulkIndexOperation<T>> bulkIndexSelector) where T : class =>
3737
Assign(a => AddOperation(bulkIndexSelector?.Invoke(new BulkIndexDescriptor<T>())));
3838

3939
/// <summary>
4040
/// IndexMany, convenience method to pass many objects at once.
4141
/// </summary>
4242
/// <param name="objects">the objects to index</param>
4343
/// <param name="bulkIndexSelector">A func called on each object to describe the individual index operation</param>
44-
public BulkDescriptor IndexMany<T>(IEnumerable<T> @objects, Func<BulkIndexDescriptor<T>, T, IIndexOperation<T>> bulkIndexSelector = null) where T : class =>
44+
public BulkDescriptor IndexMany<T>(IEnumerable<T> @objects, Func<BulkIndexDescriptor<T>, T, IBulkIndexOperation<T>> bulkIndexSelector = null) where T : class =>
4545
Assign(a => @objects.ForEach(o => AddOperation(bulkIndexSelector.InvokeOrDefault(new BulkIndexDescriptor<T>().Document(o), o))));
4646

4747
public BulkDescriptor Delete<T>(T obj, Func<BulkDeleteDescriptor<T>, IBulkDeleteOperation<T>> bulkDeleteSelector = null) where T : class =>
@@ -74,7 +74,7 @@ public BulkDescriptor DeleteMany<T>(IEnumerable<string> ids, Func<BulkDeleteDesc
7474
public BulkDescriptor DeleteMany<T>(IEnumerable<long> ids, Func<BulkDeleteDescriptor<T>, long, IBulkDeleteOperation<T>> bulkDeleteSelector = null) where T : class =>
7575
Assign(a => ids.ForEach(o => AddOperation(bulkDeleteSelector.InvokeOrDefault(new BulkDeleteDescriptor<T>().Id(o), o))));
7676

77-
public BulkDescriptor Update<T>(Func<BulkUpdateDescriptor<T, T>, IBulkUpdateOperation<T, T>> bulkUpdateSelector) where T : class =>
77+
public BulkDescriptor Update<T>(Func<BulkUpdateDescriptor<T, T>, IBulkUpdateOperation<T, T>> bulkUpdateSelector) where T : class =>
7878
this.Update<T, T>(bulkUpdateSelector);
7979

8080
public BulkDescriptor Update<T, TPartialDocument>(Func<BulkUpdateDescriptor<T, TPartialDocument>, IBulkUpdateOperation<T, TPartialDocument>> bulkUpdateSelector)

src/Tests/Document/Multiple/Bulk/BulkApiTests.cs

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,44 +24,76 @@ protected override LazyResponses ClientUsage() => Calls(
2424
protected override bool ExpectIsValid => true;
2525
protected override int ExpectStatusCode => 200;
2626
protected override HttpMethod HttpMethod => HttpMethod.POST;
27-
protected override string UrlPath => $"/{CallIsolatedValue}/_bulk";
27+
protected override string UrlPath => $"/{CallIsolatedValue}/_bulk?pipeline=default-pipeline";
2828

2929
protected override bool SupportsDeserialization => false;
3030

31+
protected override void IntegrationSetup(IElasticClient client, CallUniqueValues values)
32+
{
33+
var pipelineResponse = client.PutPipeline("default-pipeline", p => p
34+
.Processors(pr => pr
35+
.Set<Project>(t => t.Field(f => f.Description).Value("Default"))
36+
)
37+
);
38+
39+
if (!pipelineResponse.IsValid)
40+
throw new Exception("Failed to set up pipeline required for bulk");
41+
42+
pipelineResponse = client.PutPipeline("pipeline", p => p
43+
.Processors(pr => pr
44+
.Set<Project>(t => t.Field(f => f.Description).Value("Overridden"))
45+
)
46+
);
47+
48+
if (!pipelineResponse.IsValid)
49+
throw new Exception("Failed to set up pipeline required for bulk");
50+
51+
base.IntegrationSetup(client, values);
52+
}
53+
3154
protected override object ExpectJson => new object[]
3255
{
33-
new Dictionary<string, object>{ { "index", new { _type = "project", _id = Project.Instance.Name } } },
56+
new Dictionary<string, object>{ { "index", new { _type = "project", _id = Project.Instance.Name, pipeline="pipeline" } } },
3457
Project.InstanceAnonymous,
3558
new Dictionary<string, object>{ { "update", new { _type="project", _id = Project.Instance.Name } } },
3659
new { doc = new { leadDeveloper = new { firstName = "martijn" } } } ,
3760
new Dictionary<string, object>{ { "create", new { _type="project", _id = Project.Instance.Name + "1" } } },
3861
Project.InstanceAnonymous,
3962
new Dictionary<string, object>{ { "delete", new { _type="project", _id = Project.Instance.Name + "1" } } },
63+
new Dictionary<string, object>{ { "create", new { _type="project", _id = Project.Instance.Name + "2" } } },
64+
Project.InstanceAnonymous,
4065
};
4166

4267
protected override Func<BulkDescriptor, IBulkRequest> Fluent => d => d
4368
.Index(CallIsolatedValue)
44-
.Index<Project>(b => b.Document(Project.Instance))
69+
.Pipeline("default-pipeline")
70+
.Index<Project>(b => b.Document(Project.Instance).Pipeline("pipeline"))
4571
.Update<Project, object>(b => b.Doc(new { leadDeveloper = new { firstName = "martijn" } }).Id(Project.Instance.Name))
4672
.Create<Project>(b => b.Document(Project.Instance).Id(Project.Instance.Name + "1"))
47-
.Delete<Project>(b=>b.Id(Project.Instance.Name + "1"));
48-
73+
.Delete<Project>(b=>b.Id(Project.Instance.Name + "1"))
74+
.Create<Project>(b => b.Document(Project.Instance).Id(Project.Instance.Name + "2"));
75+
4976

50-
protected override BulkRequest Initializer =>
77+
protected override BulkRequest Initializer =>
5178
new BulkRequest(CallIsolatedValue)
5279
{
80+
Pipeline = "default-pipeline",
5381
Operations = new List<IBulkOperation>
5482
{
55-
new BulkIndexOperation<Project>(Project.Instance),
83+
new BulkIndexOperation<Project>(Project.Instance) { Pipeline = "pipeline" },
5684
new BulkUpdateOperation<Project, object>(Project.Instance)
5785
{
5886
Doc = new { leadDeveloper = new { firstName = "martijn" } }
5987
},
6088
new BulkCreateOperation<Project>(Project.Instance)
6189
{
62-
Id = Project.Instance.Name + "1"
90+
Id = Project.Instance.Name + "1",
6391
},
6492
new BulkDeleteOperation<Project>(Project.Instance.Name + "1"),
93+
new BulkCreateOperation<Project>(Project.Instance)
94+
{
95+
Id = Project.Instance.Name + "2",
96+
},
6597
}
6698
};
6799

@@ -84,9 +116,12 @@ protected override void ExpectResponse(IBulkResponse response)
84116
item.Shards.Successful.Should().BeGreaterThan(0);
85117
}
86118

87-
var p1 = this.Client.Source<Project>(Project.Instance.Name, p => p.Index(CallIsolatedValue));
88-
p1.LeadDeveloper.FirstName.Should().Be("martijn");
89-
}
119+
var project1 = this.Client.Source<Project>(Project.Instance.Name, p => p.Index(CallIsolatedValue));
120+
project1.LeadDeveloper.FirstName.Should().Be("martijn");
121+
project1.Description.Should().Be("Overridden");
90122

123+
var project2 = this.Client.Source<Project>(Project.Instance.Name + "2", p => p.Index(CallIsolatedValue));
124+
project2.Description.Should().Be("Default");
125+
}
91126
}
92127
}

src/Tests/Ingest/SimulatePipeline/SImulatePipelineApiTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,22 +94,22 @@ protected override void ExpectResponse(ISimulatePipelineResponse response)
9494
response.IsValid.Should().BeTrue();
9595
response.Documents.Should().NotBeNull().And.HaveCount(3);
9696

97-
var simulation = response.Documents.Where(d => d.Document.Id == Project.Instance.Name).FirstOrDefault();
97+
var simulation = response.Documents.FirstOrDefault(d => d.Document.Id == Project.Instance.Name);
9898
simulation.Should().NotBeNull();
9999
simulation.Document.Ingest.Should().NotBeNull();
100100
simulation.Document.Ingest.Timestamp.Should().NotBe(default(DateTime));
101101
var project = simulation.Document.Source.As<Project>();
102102
project.Should().NotBeNull();
103103
project.Name.Should().Be("BUZZ");
104104

105-
simulation = response.Documents.Where(d => d.Document.Id == "otherid").FirstOrDefault();
105+
simulation = response.Documents.FirstOrDefault(d => d.Document.Id == "otherid");
106106
simulation.Should().NotBeNull();
107107
simulation.Document.Ingest.Should().NotBeNull();
108108
simulation.Document.Ingest.Timestamp.Should().NotBe(default(DateTime));
109109
project = simulation.Document.Source.As<Project>();
110110
project.Name.Should().Be("BUZZ");
111111

112-
simulation = response.Documents.Where(d => d.Document.Id == "2").FirstOrDefault();
112+
simulation = response.Documents.FirstOrDefault(d => d.Document.Id == "2");
113113
simulation.Document.Ingest.Should().NotBeNull();
114114
simulation.Document.Ingest.Timestamp.Should().NotBe(default(DateTime));
115115
var anotherType = simulation.Document.Source.As<AnotherType>();

0 commit comments

Comments
 (0)