@@ -83,6 +83,8 @@ type kv struct {
8383 sequential bool
8484 zipfian bool
8585 sfuDelay time.Duration
86+ longRunningTxn bool
87+ longRunningTxnNumWrites int
8688 splits int
8789 scatter bool
8890 secondaryIndex bool
@@ -119,18 +121,20 @@ var kvMeta = workload.Meta{
119121 g := & kv {}
120122 g .flags .FlagSet = pflag .NewFlagSet (`kv` , pflag .ContinueOnError )
121123 g .flags .Meta = map [string ]workload.FlagMeta {
122- `batch` : {RuntimeOnly : true },
123- `sfu-wait-delay` : {RuntimeOnly : true },
124- `sfu-writes` : {RuntimeOnly : true },
125- `read-percent` : {RuntimeOnly : true },
126- `span-percent` : {RuntimeOnly : true },
127- `span-limit` : {RuntimeOnly : true },
128- `del-percent` : {RuntimeOnly : true },
129- `splits` : {RuntimeOnly : true },
130- `scatter` : {RuntimeOnly : true },
131- `timeout` : {RuntimeOnly : true },
132- `prepare-read-only` : {RuntimeOnly : true },
133- `sel1-writes` : {RuntimeOnly : true },
124+ `batch` : {RuntimeOnly : true },
125+ `sfu-wait-delay` : {RuntimeOnly : true },
126+ `sfu-writes` : {RuntimeOnly : true },
127+ `long-running-txn` : {RuntimeOnly : true },
128+ `long-running-txn-num-writes` : {RuntimeOnly : true },
129+ `read-percent` : {RuntimeOnly : true },
130+ `span-percent` : {RuntimeOnly : true },
131+ `span-limit` : {RuntimeOnly : true },
132+ `del-percent` : {RuntimeOnly : true },
133+ `splits` : {RuntimeOnly : true },
134+ `scatter` : {RuntimeOnly : true },
135+ `timeout` : {RuntimeOnly : true },
136+ `prepare-read-only` : {RuntimeOnly : true },
137+ `sel1-writes` : {RuntimeOnly : true },
134138 }
135139 g .flags .IntVar (& g .batchSize , `batch` , 1 ,
136140 `Number of blocks to read/insert in a single SQL statement.` )
@@ -187,6 +191,13 @@ var kvMeta = workload.Meta{
187191 g .flags .BoolVar (& g .prepareReadOnly , `prepare-read-only` , false , `Prepare and perform only read statements.` )
188192 g .flags .BoolVar (& g .writesUseSelect1 , `sel1-writes` , false ,
189193 `Use SELECT 1 as the first statement of transactional writes with a sleep after SELECT 1.` )
194+ g .flags .BoolVar (& g .longRunningTxn , `long-running-txn` , false ,
195+ `Use a long-running transaction for running lock contention scenarios. If run with ` +
196+ `--sfu-writes or --sel1-writes, it will use those writes in the long-running transaction; ` +
197+ `otherwise, it will use regular writes. Each long-running write transaction counts for a` +
198+ `single write, as measured by --read-percent.` )
199+ g .flags .IntVar (& g .longRunningTxnNumWrites , `long-running-txn-num-writes` , 10 ,
200+ `Number of writes in the long-running transaction when using --long-running-txn.` )
190201 g .connFlags = workload .NewConnFlags (& g .flags )
191202 return g
192203 },
@@ -670,23 +681,26 @@ func (o *kvOp) run(ctx context.Context) (retErr error) {
670681 o .hists .Get (`span` ).Record (elapsed )
671682 return err
672683 }
673- const argCount = 2
674- writeArgs := make ([]interface {}, argCount * o .config .batchSize )
675- var sfuArgs []interface {}
676- if o .config .writesUseSelectForUpdate {
677- sfuArgs = make ([]interface {}, o .config .batchSize )
678- }
679- for i := 0 ; i < o .config .batchSize ; i ++ {
680- j := i * argCount
681- writeArgs [j + 0 ] = o .t .getKey (o .g .writeKey ())
682- if sfuArgs != nil {
683- sfuArgs [i ] = writeArgs [j ]
684+ makeWriteBatchArgs := func () ([]interface {}, []interface {}) {
685+ const argCount = 2
686+ writeArgs := make ([]interface {}, argCount * o .config .batchSize )
687+ var sfuArgs []interface {}
688+ if o .config .writesUseSelectForUpdate {
689+ sfuArgs = make ([]interface {}, o .config .batchSize )
690+ }
691+ for i := 0 ; i < o .config .batchSize ; i ++ {
692+ j := i * argCount
693+ writeArgs [j + 0 ] = o .t .getKey (o .g .writeKey ())
694+ if sfuArgs != nil {
695+ sfuArgs [i ] = writeArgs [j ]
696+ }
697+ writeArgs [j + 1 ] = o .config .randBlock (o .g .rand ())
684698 }
685- writeArgs [ j + 1 ] = o . config . randBlock ( o . g . rand ())
699+ return writeArgs , sfuArgs
686700 }
687701 start := timeutil .Now ()
688702 var err error
689- if o .config .writesUseSelect1 || o .config .writesUseSelectForUpdate {
703+ if o .config .writesUseSelect1 || o .config .writesUseSelectForUpdate || o . config . longRunningTxn {
690704 // We could use crdb.ExecuteTx, but we avoid retries in this workload so
691705 // that each run call makes 1 attempt, so that rate limiting in workerRun
692706 // behaves as expected.
@@ -695,44 +709,52 @@ func (o *kvOp) run(ctx context.Context) (retErr error) {
695709 if err != nil {
696710 return err
697711 }
698-
699712 defer func () {
700713 rollbackErr := tx .Rollback (ctx )
701714 if ! errors .Is (rollbackErr , pgx .ErrTxClosed ) {
702715 retErr = errors .CombineErrors (retErr , rollbackErr )
703716 }
704717 }()
705- if o .config .writesUseSelect1 {
706- rows , err := o .sel1Stmt .QueryTx (ctx , tx )
707- if err != nil {
708- return err
709- }
710- rows .Close ()
711- if err = rows .Err (); err != nil {
712- return err
713- }
718+ iterations := 1
719+ if o .config .longRunningTxn {
720+ iterations = o .config .longRunningTxnNumWrites
714721 }
715- if o .config .writesUseSelectForUpdate {
716- rows , err := o .sfuStmt .QueryTx (ctx , tx , sfuArgs ... )
717- if err != nil {
718- return err
722+ for i := 0 ; i < iterations ; i ++ {
723+ writeArgs , sfuArgs := makeWriteBatchArgs ()
724+ if o .config .writesUseSelect1 {
725+ rows , err := o .sel1Stmt .QueryTx (ctx , tx )
726+ if err != nil {
727+ return err
728+ }
729+ rows .Close ()
730+ if err = rows .Err (); err != nil {
731+ return err
732+ }
733+ }
734+ if o .config .writesUseSelectForUpdate {
735+ rows , err := o .sfuStmt .QueryTx (ctx , tx , sfuArgs ... )
736+ if err != nil {
737+ return err
738+ }
739+ rows .Close ()
740+ if err = rows .Err (); err != nil {
741+ // The transaction may have experienced an error in the meantime.
742+ return o .tryHandleWriteErr ("write-write-err" , start , err )
743+ }
719744 }
720- rows .Close ()
721- if err = rows .Err (); err != nil {
722- return err
745+ // Simulate a transaction that does other work between the sel1 / SFU and write.
746+ time .Sleep (o .config .sfuDelay )
747+ if _ , err = o .writeStmt .ExecTx (ctx , tx , writeArgs ... ); err != nil {
748+ // Multiple write transactions can contend and encounter
749+ // a serialization failure. We swallow such an error.
750+ return o .tryHandleWriteErr ("write-write-err" , start , err )
723751 }
724752 }
725- // Simulate a transaction that does other work between the sel1 / SFU and write.
726- time .Sleep (o .config .sfuDelay )
727- if _ , err = o .writeStmt .ExecTx (ctx , tx , writeArgs ... ); err != nil {
728- // Multiple write transactions can contend and encounter
729- // a serialization failure. We swallow such an error.
730- return o .tryHandleWriteErr ("write-write-err" , start , err )
731- }
732753 if err = tx .Commit (ctx ); err != nil {
733754 return o .tryHandleWriteErr ("write-commit-err" , start , err )
734755 }
735756 } else {
757+ writeArgs , _ := makeWriteBatchArgs ()
736758 _ , err = o .writeStmt .Exec (ctx , writeArgs ... )
737759 }
738760 if err != nil {
0 commit comments