Skip to content

Commit dc0af28

Browse files
committed
Add if_seq_no and if_primary_term to bulk updates (#4015)
1 parent e58db5b commit dc0af28

File tree

5 files changed

+37
-4
lines changed

5 files changed

+37
-4
lines changed

src/Nest/Document/Multiple/Bulk/BulkOperation/BulkIndex.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class BulkIndexDescriptor<T> : BulkOperationDescriptorBase<BulkIndexDescr
6060
string IBulkIndexOperation<T>.Percolate { get; set; }
6161
string IBulkIndexOperation<T>.Pipeline { get; set; }
6262
long? IBulkIndexOperation<T>.IfSequenceNumber { get; set; }
63+
6364
long? IBulkIndexOperation<T>.IfPrimaryTerm { get; set; }
6465

6566
protected override object GetBulkOperationBody() => Self.Document;

src/Nest/Document/Multiple/Bulk/BulkOperation/BulkOperationBase.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ public abstract class BulkOperationDescriptorBase<TDescriptor, TInterface>
5454
/// <summary>
5555
/// Only used for bulk update operations but in the future might come in handy for other complex bulk ops.
5656
/// </summary>
57-
/// <returns></returns>
5857
object IBulkOperation.GetBody() => GetBulkOperationBody();
5958

6059
Id IBulkOperation.GetIdForOperation(Inferrer inferrer) => GetIdForOperation(inferrer);

src/Nest/Document/Multiple/Bulk/BulkOperation/BulkOperationsCollection.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public TOperation this[int index]
6262

6363
bool ICollection.IsSynchronized => true;
6464

65-
6665
object IList.this[int index]
6766
{
6867
get => this[index];

src/Nest/Document/Multiple/Bulk/BulkOperation/BulkUpdate.cs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ public interface IBulkUpdateOperation<TDocument, TPartialDocument> : IBulkOperat
4343
/// A document to upsert when the specified document to be updated is not found
4444
/// </summary>
4545
TDocument Upsert { get; set; }
46+
47+
long? IfSequenceNumber { get; set; }
48+
49+
long? IfPrimaryTerm { get; set; }
4650
}
4751

4852
[DataContract]
@@ -112,6 +116,10 @@ public BulkUpdateOperation(TDocument idFrom, TPartialDocument update, bool useId
112116
/// </summary>
113117
public TDocument Upsert { get; set; }
114118

119+
public long? IfSequenceNumber { get; set; }
120+
121+
public long? IfPrimaryTerm { get; set; }
122+
115123
protected override Type ClrType => typeof(TDocument);
116124

117125
protected override string Operation => "update";
@@ -140,7 +148,9 @@ protected override object GetBody() =>
140148
Script = Script,
141149
Upsert = Upsert,
142150
DocAsUpsert = DocAsUpsert,
143-
ScriptedUpsert = ScriptedUpsert
151+
ScriptedUpsert = ScriptedUpsert,
152+
IfPrimaryTerm = IfPrimaryTerm,
153+
IfSequenceNumber = IfSequenceNumber
144154
};
145155
}
146156

@@ -161,14 +171,20 @@ public class BulkUpdateDescriptor<TDocument, TPartialDocument>
161171
bool? IBulkUpdateOperation<TDocument, TPartialDocument>.ScriptedUpsert { get; set; }
162172
TDocument IBulkUpdateOperation<TDocument, TPartialDocument>.Upsert { get; set; }
163173

174+
long? IBulkUpdateOperation<TDocument, TPartialDocument>.IfSequenceNumber { get; set; }
175+
176+
long? IBulkUpdateOperation<TDocument, TPartialDocument>.IfPrimaryTerm { get; set; }
177+
164178
protected override object GetBulkOperationBody() =>
165179
new BulkUpdateBody<TDocument, TPartialDocument>
166180
{
167181
PartialUpdate = Self.Doc,
168182
Script = Self.Script,
169183
Upsert = Self.Upsert,
170184
DocAsUpsert = Self.DocAsUpsert,
171-
ScriptedUpsert = Self.ScriptedUpsert
185+
ScriptedUpsert = Self.ScriptedUpsert,
186+
IfPrimaryTerm = Self.IfPrimaryTerm,
187+
IfSequenceNumber = Self.IfSequenceNumber
172188
};
173189

174190
protected override Id GetIdForOperation(Inferrer inferrer) =>
@@ -236,5 +252,17 @@ public BulkUpdateDescriptor<TDocument, TPartialDocument> Script(Func<ScriptDescr
236252
/// </summary>
237253
public BulkUpdateDescriptor<TDocument, TPartialDocument> RetriesOnConflict(int? retriesOnConflict) =>
238254
Assign(retriesOnConflict, (a, v) => a.RetriesOnConflict = v);
255+
256+
/// <summary>
257+
/// Operations can be made conditional and only be performed if the last modification to the document was assigned the sequence number.
258+
/// </summary>
259+
public BulkUpdateDescriptor<TDocument, TPartialDocument> IfSequenceNumber(long? seqNo) =>
260+
Assign(seqNo, (a, v) => a.IfSequenceNumber = v);
261+
262+
/// <summary>
263+
/// Operations can be made conditional and only be performed if the last modification to the document was assigned the primary term.
264+
/// </summary>
265+
public BulkUpdateDescriptor<TDocument, TPartialDocument> IfPrimaryTerm(long? primaryTerm) =>
266+
Assign(primaryTerm, (a, v) => a.IfSequenceNumber = v);
239267
}
240268
}

src/Nest/Document/Multiple/Bulk/BulkOperation/BulkUpdateBody.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,11 @@ internal class BulkUpdateBody<TDocument, TPartialUpdate>
2424
[DataMember(Name ="upsert")]
2525
[JsonFormatter(typeof(CollapsedSourceFormatter<>))]
2626
internal TDocument Upsert { get; set; }
27+
28+
[DataMember(Name = "if_seq_no")]
29+
internal long? IfSequenceNumber { get; set; }
30+
31+
[DataMember(Name = "if_primary_term")]
32+
internal long? IfPrimaryTerm { get; set; }
2733
}
2834
}

0 commit comments

Comments
 (0)