Skip to content

Commit adcad57

Browse files
codebrainrusscam
authored andcommitted
Add if_seq_no and if_primary_term to bulk updates (#4015)
(cherry picked from commit d822c33)
1 parent 607ccdb commit adcad57

File tree

4 files changed

+50
-4
lines changed

4 files changed

+50
-4
lines changed

src/Nest/Document/Multiple/Bulk/BulkOperation/BulkIndex.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ public interface IBulkIndexOperation<T> : IBulkOperation
1313

1414
[JsonProperty("pipeline")]
1515
string Pipeline { get; set; }
16+
17+
[JsonProperty("if_seq_no")]
18+
long? IfSequenceNumber { get; set; }
19+
20+
[JsonProperty("if_primary_term")]
21+
long? IfPrimaryTerm { get; set; }
1622
}
1723

1824
public class BulkIndexOperation<T> : BulkOperationBase, IBulkIndexOperation<T>
@@ -25,6 +31,10 @@ public class BulkIndexOperation<T> : BulkOperationBase, IBulkIndexOperation<T>
2531
public string Percolate { get; set; }
2632

2733
public string Pipeline { get; set; }
34+
35+
public long? IfSequenceNumber { get; set; }
36+
37+
public long? IfPrimaryTerm { get; set; }
2838

2939
protected override Type ClrType => typeof(T);
3040

@@ -46,6 +56,8 @@ public class BulkIndexDescriptor<T> : BulkOperationDescriptorBase<BulkIndexDescr
4656
T IBulkIndexOperation<T>.Document { get; set; }
4757
string IBulkIndexOperation<T>.Percolate { get; set; }
4858
string IBulkIndexOperation<T>.Pipeline { get; set; }
59+
long? IBulkIndexOperation<T>.IfSequenceNumber { get; set; }
60+
long? IBulkIndexOperation<T>.IfPrimaryTerm { get; set; }
4961

5062
protected override object GetBulkOperationBody() => Self.Document;
5163

@@ -64,5 +76,9 @@ public class BulkIndexDescriptor<T> : BulkOperationDescriptorBase<BulkIndexDescr
6476
public BulkIndexDescriptor<T> Pipeline(string pipeline) => Assign(pipeline, (a, v) => a.Pipeline = v);
6577

6678
public BulkIndexDescriptor<T> Percolate(string percolate) => Assign(percolate, (a, v) => a.Percolate = v);
79+
80+
public BulkIndexDescriptor<T> IfSequenceNumber(long? seqNo) => Assign(seqNo, (a, v) => a.IfSequenceNumber = v);
81+
82+
public BulkIndexDescriptor<T> IfPrimaryTerm(long? primaryTerm) => Assign(primaryTerm, (a, v) => a.IfSequenceNumber = v);
6783
}
6884
}

src/Nest/Document/Multiple/Bulk/BulkOperation/BulkOperationBase.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public abstract class BulkOperationDescriptorBase<TDescriptor, TInterface>
6262
/// <summary>
6363
/// Only used for bulk update operations but in the future might come in handy for other complex bulk ops.
6464
/// </summary>
65-
/// <returns></returns>
6665
object IBulkOperation.GetBody() => GetBulkOperationBody();
6766

6867
Id IBulkOperation.GetIdForOperation(Inferrer inferrer) => GetIdForOperation(inferrer);

src/Nest/Document/Multiple/Bulk/BulkOperation/BulkUpdate.cs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ public interface IBulkUpdateOperation<TDocument, TPartialDocument> : IBulkOperat
4040
/// A document to upsert when the specified document to be updated is not found
4141
/// </summary>
4242
TDocument Upsert { get; set; }
43+
44+
long? IfSequenceNumber { get; set; }
45+
46+
long? IfPrimaryTerm { get; set; }
4347
}
4448

4549
public class BulkUpdateOperation<TDocument, TPartialDocument> : BulkOperationBase, IBulkUpdateOperation<TDocument, TPartialDocument>
@@ -108,6 +112,10 @@ public BulkUpdateOperation(TDocument idFrom, TPartialDocument update, bool useId
108112
/// </summary>
109113
public TDocument Upsert { get; set; }
110114

115+
public long? IfSequenceNumber { get; set; }
116+
117+
public long? IfPrimaryTerm { get; set; }
118+
111119
protected override Type ClrType => typeof(TDocument);
112120

113121
protected override string Operation => "update";
@@ -125,7 +133,9 @@ protected override object GetBody() =>
125133
_Script = Script,
126134
_Upsert = Upsert,
127135
_DocAsUpsert = DocAsUpsert,
128-
_ScriptedUpsert = ScriptedUpsert
136+
_ScriptedUpsert = ScriptedUpsert,
137+
IfPrimaryTerm = IfPrimaryTerm,
138+
IfSequenceNumber = IfSequenceNumber
129139
};
130140
}
131141

@@ -139,11 +149,12 @@ public class BulkUpdateDescriptor<TDocument, TPartialDocument>
139149
protected override string BulkOperationType => "update";
140150
TPartialDocument IBulkUpdateOperation<TDocument, TPartialDocument>.Doc { get; set; }
141151
bool? IBulkUpdateOperation<TDocument, TPartialDocument>.DocAsUpsert { get; set; }
142-
143152
TDocument IBulkUpdateOperation<TDocument, TPartialDocument>.IdFrom { get; set; }
144153
IScript IBulkUpdateOperation<TDocument, TPartialDocument>.Script { get; set; }
145154
bool? IBulkUpdateOperation<TDocument, TPartialDocument>.ScriptedUpsert { get; set; }
146155
TDocument IBulkUpdateOperation<TDocument, TPartialDocument>.Upsert { get; set; }
156+
long? IBulkUpdateOperation<TDocument, TPartialDocument>.IfSequenceNumber { get; set; }
157+
long? IBulkUpdateOperation<TDocument, TPartialDocument>.IfPrimaryTerm { get; set; }
147158

148159
protected override object GetBulkOperationBody() =>
149160
new BulkUpdateBody<TDocument, TPartialDocument>
@@ -152,7 +163,9 @@ protected override object GetBulkOperationBody() =>
152163
_Script = Self.Script,
153164
_Upsert = Self.Upsert,
154165
_DocAsUpsert = Self.DocAsUpsert,
155-
_ScriptedUpsert = Self.ScriptedUpsert
166+
_ScriptedUpsert = Self.ScriptedUpsert,
167+
IfPrimaryTerm = Self.IfPrimaryTerm,
168+
IfSequenceNumber = Self.IfSequenceNumber
156169
};
157170

158171
protected override Id GetIdForOperation(Inferrer inferrer) =>
@@ -209,5 +222,17 @@ public BulkUpdateDescriptor<TDocument, TPartialDocument> Script(Func<ScriptDescr
209222
/// </summary>
210223
public BulkUpdateDescriptor<TDocument, TPartialDocument> RetriesOnConflict(int? retriesOnConflict) =>
211224
Assign(retriesOnConflict, (a, v) => a.RetriesOnConflict = v);
225+
226+
/// <summary>
227+
/// Operations can be made conditional and only be performed if the last modification to the document was assigned the sequence number.
228+
/// </summary>
229+
public BulkUpdateDescriptor<TDocument, TPartialDocument> IfSequenceNumber(long? seqNo) =>
230+
Assign(seqNo, (a, v) => a.IfSequenceNumber = v);
231+
232+
/// <summary>
233+
/// Operations can be made conditional and only be performed if the last modification to the document was assigned the primary term.
234+
/// </summary>
235+
public BulkUpdateDescriptor<TDocument, TPartialDocument> IfPrimaryTerm(long? primaryTerm) =>
236+
Assign(primaryTerm, (a, v) => a.IfSequenceNumber = v);
212237
}
213238
}

src/Nest/Document/Multiple/Bulk/BulkOperation/BulkUpdateBody.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,14 @@ internal class BulkUpdateBody<TDocument, TPartialUpdate>
1919
[JsonProperty("scripted_upsert")]
2020
internal bool? _ScriptedUpsert { get; set; }
2121

22+
[JsonProperty("if_seq_no")]
23+
internal long? IfSequenceNumber { get; set; }
24+
2225
[JsonProperty("upsert")]
2326
[JsonConverter(typeof(CollapsedSourceConverter))]
2427
internal TDocument _Upsert { get; set; }
28+
29+
[JsonProperty("if_primary_term")]
30+
internal long? IfPrimaryTerm { get; set; }
2531
}
2632
}

0 commit comments

Comments
 (0)