@@ -177,6 +177,243 @@ func TestDisableInternalRetries(t *testing.T) {
177177 CloseRows (pool .ObjectId , conn .ObjectId , results .ObjectId )
178178}
179179
180+ func TestApply (t * testing.T ) {
181+ t .Parallel ()
182+
183+ dsn , server , teardown := setupTestDBConnection (t )
184+ defer teardown ()
185+
186+ pool := CreatePool (dsn )
187+ defer ClosePool (pool .ObjectId )
188+ conn := CreateConnection (pool .ObjectId )
189+ defer CloseConnection (pool .ObjectId , conn .ObjectId )
190+
191+ mutations := sppb.BatchWriteRequest_MutationGroup {
192+ Mutations : []* sppb.Mutation {
193+ {Operation : & sppb.Mutation_Insert {Insert : & sppb.Mutation_Write {
194+ Table : "foo" ,
195+ Columns : []string {"id" , "value" },
196+ Values : []* structpb.ListValue {
197+ {Values : []* structpb.Value {
198+ {Kind : & structpb.Value_StringValue {StringValue : "1" }},
199+ {Kind : & structpb.Value_StringValue {StringValue : "One" }},
200+ }},
201+ {Values : []* structpb.Value {
202+ {Kind : & structpb.Value_StringValue {StringValue : "2" }},
203+ {Kind : & structpb.Value_StringValue {StringValue : "Two" }},
204+ }},
205+ },
206+ }}},
207+ },
208+ }
209+ mutationsBytes , _ := proto .Marshal (& mutations )
210+ response := Apply (pool .ObjectId , conn .ObjectId , mutationsBytes )
211+ if response .Code != 0 {
212+ t .Fatalf ("failed to apply mutations: %v" , response .Code )
213+ }
214+ commitResponse := sppb.CommitResponse {}
215+ _ = proto .Unmarshal (response .Res , & commitResponse )
216+ if commitResponse .CommitTimestamp == nil {
217+ t .Fatal ("commit timestamp missing" )
218+ }
219+
220+ requests := drainRequestsFromServer (server .TestSpanner )
221+ beginRequests := requestsOfType (requests , reflect .TypeOf (& sppb.BeginTransactionRequest {}))
222+ if g , w := len (beginRequests ), 1 ; g != w {
223+ t .Fatalf ("begin requests count mismatch\n Got: %v\n Want: %v" , g , w )
224+ }
225+ req := beginRequests [0 ].(* sppb.BeginTransactionRequest )
226+ if req .Options == nil {
227+ t .Fatalf ("missing tx opts" )
228+ }
229+ if req .Options .GetReadWrite () == nil {
230+ t .Fatalf ("missing tx read write" )
231+ }
232+ commitRequests := requestsOfType (requests , reflect .TypeOf (& sppb.CommitRequest {}))
233+ if g , w := len (commitRequests ), 1 ; g != w {
234+ t .Fatalf ("commit requests count mismatch\n Got: %v\n Want: %v" , g , w )
235+ }
236+ commitReq := commitRequests [0 ].(* sppb.CommitRequest )
237+ if g , w := len (commitReq .Mutations ), 1 ; g != w {
238+ t .Fatalf ("mutation count mismatch\n Got: %v\n Want: %v" , g , w )
239+ }
240+ if g , w := len (commitReq .Mutations [0 ].GetInsert ().Values ), 2 ; g != w {
241+ t .Fatalf ("mutation values count mismatch\n Got: %v\n Want: %v" , g , w )
242+ }
243+ }
244+
245+ func TestBufferWrite (t * testing.T ) {
246+ t .Parallel ()
247+
248+ dsn , server , teardown := setupTestDBConnection (t )
249+ defer teardown ()
250+
251+ pool := CreatePool (dsn )
252+ defer ClosePool (pool .ObjectId )
253+ conn := CreateConnection (pool .ObjectId )
254+ defer CloseConnection (pool .ObjectId , conn .ObjectId )
255+
256+ txOpts := & sppb.TransactionOptions {}
257+ txOptsBytes , _ := proto .Marshal (txOpts )
258+ tx := BeginTransaction (pool .ObjectId , conn .ObjectId , txOptsBytes )
259+
260+ mutations := sppb.BatchWriteRequest_MutationGroup {
261+ Mutations : []* sppb.Mutation {
262+ {Operation : & sppb.Mutation_Insert {Insert : & sppb.Mutation_Write {
263+ Table : "foo" ,
264+ Columns : []string {"id" , "value" },
265+ Values : []* structpb.ListValue {
266+ {Values : []* structpb.Value {
267+ {Kind : & structpb.Value_StringValue {StringValue : "1" }},
268+ {Kind : & structpb.Value_StringValue {StringValue : "One" }},
269+ }},
270+ {Values : []* structpb.Value {
271+ {Kind : & structpb.Value_StringValue {StringValue : "2" }},
272+ {Kind : & structpb.Value_StringValue {StringValue : "Two" }},
273+ }},
274+ },
275+ }}},
276+ },
277+ }
278+ mutationsBytes , _ := proto .Marshal (& mutations )
279+ response := BufferWrite (pool .ObjectId , conn .ObjectId , tx .ObjectId , mutationsBytes )
280+ if response .Code != 0 {
281+ t .Fatalf ("failed to apply mutations: %v" , response .Code )
282+ }
283+ if response .Length () > 0 {
284+ t .Fatal ("response length mismatch" )
285+ }
286+
287+ requests := drainRequestsFromServer (server .TestSpanner )
288+ beginRequests := requestsOfType (requests , reflect .TypeOf (& sppb.BeginTransactionRequest {}))
289+ if g , w := len (beginRequests ), 1 ; g != w {
290+ t .Fatalf ("begin requests count mismatch\n Got: %v\n Want: %v" , g , w )
291+ }
292+ req := beginRequests [0 ].(* sppb.BeginTransactionRequest )
293+ if req .Options == nil {
294+ t .Fatalf ("missing tx opts" )
295+ }
296+ if req .Options .GetReadWrite () == nil {
297+ t .Fatalf ("missing tx read write" )
298+ }
299+
300+ // There should not be any commit requests yet.
301+ commitRequests := requestsOfType (requests , reflect .TypeOf (& sppb.CommitRequest {}))
302+ if g , w := len (commitRequests ), 0 ; g != w {
303+ t .Fatalf ("commit requests count mismatch\n Got: %v\n Want: %v" , g , w )
304+ }
305+
306+ // Commit the transaction with the mutation.
307+ res := Commit (pool .ObjectId , conn .ObjectId , tx .ObjectId )
308+ if res .Code != 0 {
309+ t .Fatalf ("failed to commit: %v" , res .Code )
310+ }
311+
312+ // Verify that we have a commit request on the server.
313+ requests = drainRequestsFromServer (server .TestSpanner )
314+ commitRequests = requestsOfType (requests , reflect .TypeOf (& sppb.CommitRequest {}))
315+ if g , w := len (commitRequests ), 1 ; g != w {
316+ t .Fatalf ("commit requests count mismatch\n Got: %v\n Want: %v" , g , w )
317+ }
318+ commitReq := commitRequests [0 ].(* sppb.CommitRequest )
319+ if g , w := len (commitReq .Mutations ), 1 ; g != w {
320+ t .Fatalf ("mutation count mismatch\n Got: %v\n Want: %v" , g , w )
321+ }
322+ if g , w := len (commitReq .Mutations [0 ].GetInsert ().Values ), 2 ; g != w {
323+ t .Fatalf ("mutation values count mismatch\n Got: %v\n Want: %v" , g , w )
324+ }
325+
326+ }
327+
328+ func TestBufferWrite_RetryAborted (t * testing.T ) {
329+ t .Parallel ()
330+
331+ dsn , server , teardown := setupTestDBConnection (t )
332+ defer teardown ()
333+
334+ pool := CreatePool (dsn )
335+ defer ClosePool (pool .ObjectId )
336+ conn := CreateConnection (pool .ObjectId )
337+ defer CloseConnection (pool .ObjectId , conn .ObjectId )
338+
339+ txOpts := & sppb.TransactionOptions {}
340+ txOptsBytes , _ := proto .Marshal (txOpts )
341+ tx := BeginTransaction (pool .ObjectId , conn .ObjectId , txOptsBytes )
342+
343+ mutations := sppb.BatchWriteRequest_MutationGroup {
344+ Mutations : []* sppb.Mutation {
345+ {Operation : & sppb.Mutation_Insert {Insert : & sppb.Mutation_Write {
346+ Table : "foo" ,
347+ Columns : []string {"id" , "value" },
348+ Values : []* structpb.ListValue {
349+ {Values : []* structpb.Value {
350+ {Kind : & structpb.Value_StringValue {StringValue : "1" }},
351+ {Kind : & structpb.Value_StringValue {StringValue : "One" }},
352+ }},
353+ {Values : []* structpb.Value {
354+ {Kind : & structpb.Value_StringValue {StringValue : "2" }},
355+ {Kind : & structpb.Value_StringValue {StringValue : "Two" }},
356+ }},
357+ },
358+ }}},
359+ },
360+ }
361+ mutationsBytes , _ := proto .Marshal (& mutations )
362+ response := BufferWrite (pool .ObjectId , conn .ObjectId , tx .ObjectId , mutationsBytes )
363+ if response .Code != 0 {
364+ t .Fatalf ("failed to apply mutations: %v" , response .Code )
365+ }
366+ if response .Length () > 0 {
367+ t .Fatal ("response length mismatch" )
368+ }
369+
370+ requests := drainRequestsFromServer (server .TestSpanner )
371+ beginRequests := requestsOfType (requests , reflect .TypeOf (& sppb.BeginTransactionRequest {}))
372+ if g , w := len (beginRequests ), 1 ; g != w {
373+ t .Fatalf ("begin requests count mismatch\n Got: %v\n Want: %v" , g , w )
374+ }
375+ req := beginRequests [0 ].(* sppb.BeginTransactionRequest )
376+ if req .Options == nil {
377+ t .Fatalf ("missing tx opts" )
378+ }
379+ if req .Options .GetReadWrite () == nil {
380+ t .Fatalf ("missing tx read write" )
381+ }
382+
383+ // There should not be any commit requests yet.
384+ commitRequests := requestsOfType (requests , reflect .TypeOf (& sppb.CommitRequest {}))
385+ if g , w := len (commitRequests ), 0 ; g != w {
386+ t .Fatalf ("commit requests count mismatch\n Got: %v\n Want: %v" , g , w )
387+ }
388+
389+ // Instruct the mock server to abort the transaction.
390+ server .TestSpanner .PutExecutionTime (testutil .MethodCommitTransaction , testutil.SimulatedExecutionTime {
391+ Errors : []error {status .Error (codes .Aborted , "Aborted" )},
392+ })
393+
394+ // Commit the transaction with the mutation.
395+ res := Commit (pool .ObjectId , conn .ObjectId , tx .ObjectId )
396+ if res .Code != 0 {
397+ t .Fatalf ("failed to commit: %v" , res .Code )
398+ }
399+
400+ // Verify that we have a commit request on the server.
401+ requests = drainRequestsFromServer (server .TestSpanner )
402+ commitRequests = requestsOfType (requests , reflect .TypeOf (& sppb.CommitRequest {}))
403+ if g , w := len (commitRequests ), 2 ; g != w {
404+ t .Fatalf ("commit requests count mismatch\n Got: %v\n Want: %v" , g , w )
405+ }
406+ for _ , req := range commitRequests {
407+ commitReq := req .(* sppb.CommitRequest )
408+ if g , w := len (commitReq .Mutations ), 1 ; g != w {
409+ t .Fatalf ("mutation count mismatch\n Got: %v\n Want: %v" , g , w )
410+ }
411+ if g , w := len (commitReq .Mutations [0 ].GetInsert ().Values ), 2 ; g != w {
412+ t .Fatalf ("mutation values count mismatch\n Got: %v\n Want: %v" , g , w )
413+ }
414+ }
415+ }
416+
180417func setupTestDBConnection (t * testing.T ) (dsn string , server * testutil.MockedSpannerInMemTestServer , teardown func ()) {
181418 return setupTestDBConnectionWithParams (t , "" )
182419}
0 commit comments