Skip to content

Commit 46d0f64

Browse files
authored
chore(shard-distirbutor): extend info to debug assignment conflicts (#7506)
<!-- Describe what has changed in this PR --> **What changed?** Extending the transaction with operation in the else branch to get the state in case the comparison fails <!-- Tell your future self why have you made these changes --> **Why?** we need to understand why there is a conflict when we assign the shards but it is not possible because etcd is not showing which is the failing comparison, so we need to be able to understand where the conflict is generated <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** unit tests that cover this case actually reported the mismatch <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** Signed-off-by: edigregorio <edigregorio@uber.com>
1 parent 24aa35c commit 46d0f64

File tree

1 file changed

+17
-2
lines changed
  • service/sharddistributor/store/etcd/executorstore

1 file changed

+17
-2
lines changed

service/sharddistributor/store/etcd/executorstore/etcdstore.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,9 @@ func (s *executorStoreImpl) Subscribe(ctx context.Context, namespace string) (<-
348348

349349
func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string, request store.AssignShardsRequest, guard store.GuardFunc) error {
350350
var ops []clientv3.Op
351+
var opsElse []clientv3.Op
351352
var comparisons []clientv3.Cmp
353+
comparisonMaps := make(map[string]int64)
352354

353355
statsUpdates, err := s.prepareShardStatisticsUpdates(ctx, namespace, request.NewState.ShardAssignments)
354356
if err != nil {
@@ -363,10 +365,12 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
363365
// Add a comparison to ensure the executor's assigned state hasn't changed
364366
// This prevents deleting an executor that just received a shard assignment
365367
comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(executorStateKey), "=", expectedModRevision))
368+
comparisonMaps[executorStateKey] = expectedModRevision
366369

367370
// Delete all keys for this executor
368371
executorPrefix := etcdkeys.BuildExecutorIDPrefix(s.prefix, namespace, executorID)
369372
ops = append(ops, clientv3.OpDelete(executorPrefix, clientv3.WithPrefix()))
373+
opsElse = append(opsElse, clientv3.OpGet(executorStateKey))
370374
}
371375

372376
// 2. Prepare operations to update executor states and shard ownership,
@@ -386,6 +390,8 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
386390
ops = append(ops, clientv3.OpPut(executorStateKey, string(compressedValue)))
387391

388392
comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(executorStateKey), "=", state.ModRevision))
393+
comparisonMaps[executorStateKey] = state.ModRevision
394+
opsElse = append(opsElse, clientv3.OpGet(executorStateKey))
389395
}
390396

391397
if len(ops) == 0 {
@@ -405,10 +411,11 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
405411

406412
// 4. Create a nested transaction operation. This allows us to add our own 'If' (comparisons)
407413
// and 'Then' (ops) logic that will only execute if the outer guard's 'If' condition passes.
414+
// we catch what is the state in the else operations so we can identify which part of the condition failed
408415
nestedTxnOp := clientv3.OpTxn(
409416
comparisons, // Our IF conditions
410417
ops, // Our THEN operations
411-
nil, // Our ELSE operations
418+
opsElse, // Our ELSE operations
412419
)
413420

414421
// 5. Add the nested transaction to the guarded transaction's THEN clause and commit.
@@ -429,10 +436,18 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
429436
if len(txnResp.Responses) == 0 {
430437
return fmt.Errorf("unexpected empty response from transaction")
431438
}
439+
432440
nestedResp := txnResp.Responses[0].GetResponseTxn()
433441
if !nestedResp.Succeeded {
434442
// This means our revision checks failed.
435-
return fmt.Errorf("%w: transaction failed, a shard may have been concurrently assigned", store.ErrVersionConflict)
443+
failingRevisionString := ""
444+
for _, keyValue := range nestedResp.Responses[0].GetResponseRange().Kvs {
445+
expectedValue, ok := comparisonMaps[string(keyValue.Key)]
446+
if !ok || expectedValue != keyValue.ModRevision {
447+
failingRevisionString = failingRevisionString + fmt.Sprintf("{ key: %s, expected:%v, actual: %v }", string(keyValue.Key), expectedValue, keyValue.ModRevision)
448+
}
449+
}
450+
return fmt.Errorf("%w: transaction failed, a shard may have been concurrently assigned, %v", store.ErrVersionConflict, failingRevisionString)
436451
}
437452

438453
// Apply shard statistics updates outside the main transaction to stay within etcd's max operations per txn.

0 commit comments

Comments
 (0)