Skip to content

Commit 11e1d88

Browse files
authored
chore: add WriteMutationsAsync to .NET wrapper (#567)
Add an async version of the WriteMutations method to the .NET wrapper.
1 parent 6f8c1e2 commit 11e1d88

File tree

5 files changed

+103
-2
lines changed

5 files changed

+103
-2
lines changed

spannerlib/wrappers/spannerlib-dotnet/spannerlib-dotnet-grpc-impl/GrpcLibSpanner.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,25 @@ public async Task CloseConnectionAsync(Connection connection, CancellationToken
172172
}));
173173
return response.CommitTimestamp == null ? null : response;
174174
}
175-
175+
176+
public async Task<CommitResponse?> WriteMutationsAsync(Connection connection,
177+
BatchWriteRequest.Types.MutationGroup mutations, CancellationToken cancellationToken = default)
178+
{
179+
try
180+
{
181+
var response = await _client.WriteMutationsAsync(new WriteMutationsRequest
182+
{
183+
Connection = ToProto(connection),
184+
Mutations = mutations,
185+
}, cancellationToken: cancellationToken);
186+
return response.CommitTimestamp == null ? null : response;
187+
}
188+
catch (RpcException exception)
189+
{
190+
throw SpannerException.ToSpannerException(exception);
191+
}
192+
}
193+
176194
public Rows Execute(Connection connection, ExecuteSqlRequest statement)
177195
{
178196
if (_useStreamingRows)

spannerlib/wrappers/spannerlib-dotnet/spannerlib-dotnet-native-impl/SharedLibSpanner.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ public void CloseConnection(Connection connection)
104104
}
105105
return CommitResponse.Parser.ParseFrom(handler.Value());
106106
}
107+
108+
public Task<CommitResponse?> WriteMutationsAsync(Connection connection,
109+
BatchWriteRequest.Types.MutationGroup mutations, CancellationToken cancellationToken = default)
110+
{
111+
return Task.Run(() => WriteMutations(connection, mutations), cancellationToken);
112+
}
107113

108114
public Rows Execute(Connection connection, ExecuteSqlRequest statement)
109115
{

spannerlib/wrappers/spannerlib-dotnet/spannerlib-dotnet-tests/ConnectionTests.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,50 @@ public void TestWriteMutations([Values] LibType libType)
8888
Assert.That(commit.Mutations[1].InsertOrUpdate.Values.Count, Is.EqualTo(1));
8989
}
9090

91+
[Test]
92+
public async Task TestWriteMutationsAsync([Values] LibType libType)
93+
{
94+
await using var pool = Pool.Create(SpannerLibDictionary[libType], ConnectionString);
95+
await using var connection = pool.CreateConnection();
96+
var insertMutation = new Mutation
97+
{
98+
Insert = new Mutation.Types.Write
99+
{
100+
Table = "my_table",
101+
Columns = { new[] { "id", "value" } },
102+
}
103+
};
104+
insertMutation.Insert.Values.AddRange([
105+
new ListValue{Values = { Value.ForString("1"), Value.ForString("One") }},
106+
new ListValue{Values = { Value.ForString("2"), Value.ForString("Two") }}
107+
]);
108+
var insertOrUpdateMutation = new Mutation
109+
{
110+
InsertOrUpdate = new Mutation.Types.Write
111+
{
112+
Table = "my_table",
113+
Columns = { new[] { "id", "value" } },
114+
}
115+
};
116+
insertOrUpdateMutation.InsertOrUpdate.Values.AddRange([
117+
new ListValue{Values = { Value.ForString("0"), Value.ForString("Zero") }}
118+
]);
119+
120+
var response = await connection.WriteMutationsAsync(new BatchWriteRequest.Types.MutationGroup
121+
{
122+
Mutations = { new []{insertMutation, insertOrUpdateMutation}}
123+
});
124+
Assert.That(response, Is.Not.Null);
125+
Assert.That(response.CommitTimestamp, Is.Not.Null);
126+
Assert.That(Fixture.SpannerMock.Requests.OfType<BeginTransactionRequest>().Count(), Is.EqualTo(1));
127+
Assert.That(Fixture.SpannerMock.Requests.OfType<CommitRequest>().Count(), Is.EqualTo(1));
128+
var commit = Fixture.SpannerMock.Requests.OfType<CommitRequest>().Single();
129+
Assert.That(commit, Is.Not.Null);
130+
Assert.That(commit.Mutations.Count, Is.EqualTo(2));
131+
Assert.That(commit.Mutations[0].Insert.Values.Count, Is.EqualTo(2));
132+
Assert.That(commit.Mutations[1].InsertOrUpdate.Values.Count, Is.EqualTo(1));
133+
}
134+
91135
[Test]
92136
public void TestWriteMutationsInTransaction([Values] LibType libType)
93137
{

spannerlib/wrappers/spannerlib-dotnet/spannerlib-dotnet/Connection.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,25 @@ public Task RollbackAsync(CancellationToken cancellationToken = default)
8888
{
8989
return Spanner.WriteMutations(this, mutations);
9090
}
91-
91+
92+
/// <summary>
93+
/// Writes the given list of mutations to Spanner. If the connection has an active read/write transaction, then the
94+
/// mutations will be buffered in the current transaction and sent to Spanner when the transaction is committed.
95+
/// If the connection does not have a transaction, then the mutations are sent to Spanner directly in a new
96+
/// read/write transaction.
97+
/// </summary>
98+
/// <param name="mutations">The mutations to write to Spanner</param>
99+
/// <param name="cancellationToken">The cancellation token</param>
100+
/// <returns>
101+
/// The CommitResponse that is returned by Spanner, or null if the mutations were only buffered in the current
102+
/// transaction.
103+
/// </returns>
104+
public Task<CommitResponse?> WriteMutationsAsync(BatchWriteRequest.Types.MutationGroup mutations,
105+
CancellationToken cancellationToken = default)
106+
{
107+
return Spanner.WriteMutationsAsync(this, mutations, cancellationToken);
108+
}
109+
92110
/// <summary>
93111
/// Executes any type of SQL statement on this connection. The SQL statement will use the current transaction of the
94112
/// connection. The contents of the returned Rows object depends on the type of SQL statement.

spannerlib/wrappers/spannerlib-dotnet/spannerlib-dotnet/ISpannerLib.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,21 @@ public enum RowEncoding
9191
/// </returns>
9292
public CommitResponse? WriteMutations(Connection connection, BatchWriteRequest.Types.MutationGroup mutations);
9393

94+
/// <summary>
95+
/// Writes an array of mutations to Spanner. The mutations are buffered in the current transaction of the given
96+
/// connection (if any). Otherwise, the mutations are written directly to Spanner using a new read/write
97+
/// transaction and the CommitResponse of that transaction is returned. The returned value is null if the mutations
98+
/// were only buffered in an active transaction.
99+
/// </summary>
100+
/// <param name="connection">The connection to use to write the mutations</param>
101+
/// <param name="mutations">The mutations to write</param>
102+
/// <param name="cancellationToken">The cancellation token</param>
103+
/// <returns>
104+
/// The CommitResponse of the read/write transaction that was created to write the mutations, or null if the
105+
/// mutations were buffered in an active transaction on the connection.
106+
/// </returns>
107+
public Task<CommitResponse?> WriteMutationsAsync(Connection connection, BatchWriteRequest.Types.MutationGroup mutations, CancellationToken cancellationToken = default);
108+
94109
/// <summary>
95110
/// Executes a SQL statement of any type on the given connection.
96111
/// </summary>

0 commit comments

Comments
 (0)