Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ type batchConfig struct {
// chainParams are the chain parameters of the chain that is used by
// batches.
chainParams *chaincfg.Params

// immediatePublishThreshold is the cumulative value threshold above
// which a batch should be published immediately without waiting for
// the initial delay grouping period. If 0, the feature is disabled.
immediatePublishThreshold btcutil.Amount
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO we can implement this inside initialDelayProvider. It already has totalSweptAmount argument, so it can have the threshold logic.

}

// rbfCache stores data related to our last fee bump.
Expand Down Expand Up @@ -807,6 +812,24 @@ func (b *batch) sweepExists(outpoint wire.OutPoint) bool {
return ok
}

// totalValue returns the sum of all sweep values in the batch.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a note that the method must run in the batch's event loop, so avoid races.

func (b *batch) totalValue() btcutil.Amount {
var total btcutil.Amount
for _, sweep := range b.sweeps {
total += sweep.value
}
return total
}
Comment on lines +816 to +822
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have variable totalSweptAmt on the call site. We can reuse it instead of adding new method. Or we can keep the method and call it there to find the value of totalSweptAmt.


// exceedsThreshold checks if the batch value exceeds the immediate publish
// threshold. Returns false if the threshold is not configured (0).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose to add the amount as an argument or to add a note that the method must be called from the batch's event loop.

func (b *batch) exceedsThreshold() bool {
if b.cfg.immediatePublishThreshold == 0 {
return false
}
return b.totalValue() >= b.cfg.immediatePublishThreshold
}
Comment on lines +826 to +831

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function is called to check if the threshold is exceeded, and then totalValue() is called again at the call site for logging. This is inefficient as it iterates over all sweeps twice.

To optimize this, consider refactoring this function to return the total value as well, so it can be reused. This change will require updating the call site in Run.

Suggested change
func (b *batch) exceedsThreshold() bool {
if b.cfg.immediatePublishThreshold == 0 {
return false
}
return b.totalValue() >= b.cfg.immediatePublishThreshold
}
func (b *batch) exceedsThreshold() (btcutil.Amount, bool) {
if b.cfg.immediatePublishThreshold == 0 {
return 0, false
}
total := b.totalValue()
return total, total >= b.cfg.immediatePublishThreshold
}


// Wait waits for the batch to gracefully stop.
func (b *batch) Wait() {
b.Infof("Stopping")
Expand Down Expand Up @@ -920,6 +943,25 @@ func (b *batch) Run(ctx context.Context) error {
skipBefore = &delayStop
skipBeforeUpdated = true
}

// Check if batch exceeds threshold and should publish
// immediately. Only override skipBefore if threshold
// feature is enabled and exceeded.
if b.exceedsThreshold() {
b.Infof("Batch value (%v) exceeds threshold "+
"(%v), publishing immediately",
b.totalValue(),
b.cfg.immediatePublishThreshold)

// Set skipBefore to now to bypass initial
// delay.
now := clock.Now()
skipBefore = &now

// Trigger immediate publish by setting timer to
// fire after batchPublishDelay.
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
Comment on lines +956 to +963
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplification idea.

I propose to move the whole branch above, under if initialDelay < 0 {
There we already have a couple of situations where we skip waiting. For consistency it is better to have them all in one place. And skipping can also be done by setting initialDelay = 0.

I propose also to add an empty line before delayStop := startTime.Add(initialDelay)

}
Comment on lines +950 to +964

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Following my suggestion to refactor exceedsThreshold, this block should be updated to use the new function signature. This avoids calling totalValue() twice and improves efficiency.

Suggested change
if b.exceedsThreshold() {
b.Infof("Batch value (%v) exceeds threshold "+
"(%v), publishing immediately",
b.totalValue(),
b.cfg.immediatePublishThreshold)
// Set skipBefore to now to bypass initial
// delay.
now := clock.Now()
skipBefore = &now
// Trigger immediate publish by setting timer to
// fire after batchPublishDelay.
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
}
if totalVal, exceeded := b.exceedsThreshold(); exceeded {
b.Infof("Batch value (%v) exceeds threshold "+
"(%v), publishing immediately",
totalVal,
b.cfg.immediatePublishThreshold)
// Set skipBefore to now to bypass initial
// delay.
now := clock.Now()
skipBefore = &now
// Trigger immediate publish by setting timer to
// fire after batchPublishDelay.
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
}

}

// Create new timer only if the value of skipBefore was updated.
Expand Down
82 changes: 51 additions & 31 deletions sweepbatcher/sweep_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,11 @@ type Batcher struct {
// skippedTxns is the list of previous transactions to ignore when
// loading the sweeps from DB. This is needed to fix a historical bug.
skippedTxns map[chainhash.Hash]struct{}

// immediatePublishThreshold is the cumulative value threshold above
// which a batch should be published immediately without waiting for
// the initial delay grouping period. If 0, the feature is disabled.
immediatePublishThreshold btcutil.Amount
}

// BatcherConfig holds batcher configuration.
Expand Down Expand Up @@ -491,6 +496,11 @@ type BatcherConfig struct {
// skippedTxns is the list of previous transactions to ignore when
// loading the sweeps from DB. This is needed to fix a historical bug.
skippedTxns map[chainhash.Hash]struct{}

// immediatePublishThreshold is the cumulative value threshold above
// which a batch should be published immediately without waiting for
// the initial delay grouping period. If 0, the feature is disabled.
immediatePublishThreshold btcutil.Amount
}

// BatcherOption configures batcher behaviour.
Expand Down Expand Up @@ -581,6 +591,14 @@ func WithSkippedTxns(skippedTxns map[chainhash.Hash]struct{}) BatcherOption {
}
}

// WithImmediatePublishThreshold sets the threshold above which batches are
// published immediately without waiting for the initial delay grouping period.
func WithImmediatePublishThreshold(threshold btcutil.Amount) BatcherOption {
return func(cfg *BatcherConfig) {
cfg.immediatePublishThreshold = threshold
}
}

// NewBatcher creates a new Batcher instance.
func NewBatcher(wallet lndclient.WalletKitClient,
chainNotifier lndclient.ChainNotifierClient,
Expand Down Expand Up @@ -626,29 +644,30 @@ func NewBatcher(wallet lndclient.WalletKitClient,
}

return &Batcher{
batches: make(map[int32]*batch),
addSweepsChan: make(chan *addSweepsRequest),
testReqs: make(chan *testRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
musig2ServerSign: musig2ServerSigner,
VerifySchnorrSig: verifySchnorrSig,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
clock: cfg.clock,
initialDelayProvider: cfg.initialDelayProvider,
publishDelay: cfg.publishDelay,
customFeeRate: cfg.customFeeRate,
txLabeler: cfg.txLabeler,
customMuSig2Signer: cfg.customMuSig2Signer,
publishErrorHandler: cfg.publishErrorHandler,
presignedHelper: cfg.presignedHelper,
skippedTxns: cfg.skippedTxns,
batches: make(map[int32]*batch),
addSweepsChan: make(chan *addSweepsRequest),
testReqs: make(chan *testRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
musig2ServerSign: musig2ServerSigner,
VerifySchnorrSig: verifySchnorrSig,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
clock: cfg.clock,
initialDelayProvider: cfg.initialDelayProvider,
publishDelay: cfg.publishDelay,
customFeeRate: cfg.customFeeRate,
txLabeler: cfg.txLabeler,
customMuSig2Signer: cfg.customMuSig2Signer,
publishErrorHandler: cfg.publishErrorHandler,
presignedHelper: cfg.presignedHelper,
skippedTxns: cfg.skippedTxns,
immediatePublishThreshold: cfg.immediatePublishThreshold,
}
}

Expand Down Expand Up @@ -1683,14 +1702,15 @@ func minimumSweepFeeRate(ctx context.Context, customFeeRate FeeRateProvider,
// newBatchConfig creates new batch config.
func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig {
return batchConfig{
maxTimeoutDistance: maxTimeoutDistance,
customFeeRate: b.customFeeRate,
txLabeler: b.txLabeler,
customMuSig2Signer: b.customMuSig2Signer,
presignedHelper: b.presignedHelper,
skippedTxns: b.skippedTxns,
clock: b.clock,
chainParams: b.chainParams,
maxTimeoutDistance: maxTimeoutDistance,
customFeeRate: b.customFeeRate,
txLabeler: b.txLabeler,
customMuSig2Signer: b.customMuSig2Signer,
presignedHelper: b.presignedHelper,
skippedTxns: b.skippedTxns,
clock: b.clock,
chainParams: b.chainParams,
immediatePublishThreshold: b.immediatePublishThreshold,
}
}

Expand Down
Loading
Loading