From fe2566ca1d4ba8a8d9cd68c5dc8175e429b31b71 Mon Sep 17 00:00:00 2001 From: Slyghtning Date: Tue, 2 Dec 2025 21:08:39 +0100 Subject: [PATCH] sweepbatcher: threshold for immediate batch publishing --- sweepbatcher/sweep_batch.go | 42 ++++++ sweepbatcher/sweep_batcher.go | 82 ++++++---- sweepbatcher/sweep_batcher_test.go | 232 +++++++++++++++++++++++++++++ 3 files changed, 325 insertions(+), 31 deletions(-) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 58cdf19dc..037498c91 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -200,6 +200,11 @@ type batchConfig struct { // chainParams are the chain parameters of the chain that is used by // batches. chainParams *chaincfg.Params + + // immediatePublishThreshold is the cumulative value threshold above + // which a batch should be published immediately without waiting for + // the initial delay grouping period. If 0, the feature is disabled. + immediatePublishThreshold btcutil.Amount } // rbfCache stores data related to our last fee bump. @@ -807,6 +812,24 @@ func (b *batch) sweepExists(outpoint wire.OutPoint) bool { return ok } +// totalValue returns the sum of all sweep values in the batch. +func (b *batch) totalValue() btcutil.Amount { + var total btcutil.Amount + for _, sweep := range b.sweeps { + total += sweep.value + } + return total +} + +// exceedsThreshold checks if the batch value exceeds the immediate publish +// threshold. Returns false if the threshold is not configured (0). +func (b *batch) exceedsThreshold() bool { + if b.cfg.immediatePublishThreshold == 0 { + return false + } + return b.totalValue() >= b.cfg.immediatePublishThreshold +} + // Wait waits for the batch to gracefully stop. func (b *batch) Wait() { b.Infof("Stopping") @@ -920,6 +943,25 @@ func (b *batch) Run(ctx context.Context) error { skipBefore = &delayStop skipBeforeUpdated = true } + + // Check if batch exceeds threshold and should publish + // immediately. Only override skipBefore if threshold + // feature is enabled and exceeded. + if b.exceedsThreshold() { + b.Infof("Batch value (%v) exceeds threshold "+ + "(%v), publishing immediately", + b.totalValue(), + b.cfg.immediatePublishThreshold) + + // Set skipBefore to now to bypass initial + // delay. + now := clock.Now() + skipBefore = &now + + // Trigger immediate publish by setting timer to + // fire after batchPublishDelay. + timerChan = clock.TickAfter(b.cfg.batchPublishDelay) + } } // Create new timer only if the value of skipBefore was updated. diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index dd5d01751..1855bf89a 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -443,6 +443,11 @@ type Batcher struct { // skippedTxns is the list of previous transactions to ignore when // loading the sweeps from DB. This is needed to fix a historical bug. skippedTxns map[chainhash.Hash]struct{} + + // immediatePublishThreshold is the cumulative value threshold above + // which a batch should be published immediately without waiting for + // the initial delay grouping period. If 0, the feature is disabled. + immediatePublishThreshold btcutil.Amount } // BatcherConfig holds batcher configuration. @@ -491,6 +496,11 @@ type BatcherConfig struct { // skippedTxns is the list of previous transactions to ignore when // loading the sweeps from DB. This is needed to fix a historical bug. skippedTxns map[chainhash.Hash]struct{} + + // immediatePublishThreshold is the cumulative value threshold above + // which a batch should be published immediately without waiting for + // the initial delay grouping period. If 0, the feature is disabled. + immediatePublishThreshold btcutil.Amount } // BatcherOption configures batcher behaviour. @@ -581,6 +591,14 @@ func WithSkippedTxns(skippedTxns map[chainhash.Hash]struct{}) BatcherOption { } } +// WithImmediatePublishThreshold sets the threshold above which batches are +// published immediately without waiting for the initial delay grouping period. +func WithImmediatePublishThreshold(threshold btcutil.Amount) BatcherOption { + return func(cfg *BatcherConfig) { + cfg.immediatePublishThreshold = threshold + } +} + // NewBatcher creates a new Batcher instance. func NewBatcher(wallet lndclient.WalletKitClient, chainNotifier lndclient.ChainNotifierClient, @@ -626,29 +644,30 @@ func NewBatcher(wallet lndclient.WalletKitClient, } return &Batcher{ - batches: make(map[int32]*batch), - addSweepsChan: make(chan *addSweepsRequest), - testReqs: make(chan *testRequest), - errChan: make(chan error, 1), - quit: make(chan struct{}), - initDone: make(chan struct{}), - wallet: wallet, - chainNotifier: chainNotifier, - signerClient: signerClient, - musig2ServerSign: musig2ServerSigner, - VerifySchnorrSig: verifySchnorrSig, - chainParams: chainparams, - store: store, - sweepStore: sweepStore, - clock: cfg.clock, - initialDelayProvider: cfg.initialDelayProvider, - publishDelay: cfg.publishDelay, - customFeeRate: cfg.customFeeRate, - txLabeler: cfg.txLabeler, - customMuSig2Signer: cfg.customMuSig2Signer, - publishErrorHandler: cfg.publishErrorHandler, - presignedHelper: cfg.presignedHelper, - skippedTxns: cfg.skippedTxns, + batches: make(map[int32]*batch), + addSweepsChan: make(chan *addSweepsRequest), + testReqs: make(chan *testRequest), + errChan: make(chan error, 1), + quit: make(chan struct{}), + initDone: make(chan struct{}), + wallet: wallet, + chainNotifier: chainNotifier, + signerClient: signerClient, + musig2ServerSign: musig2ServerSigner, + VerifySchnorrSig: verifySchnorrSig, + chainParams: chainparams, + store: store, + sweepStore: sweepStore, + clock: cfg.clock, + initialDelayProvider: cfg.initialDelayProvider, + publishDelay: cfg.publishDelay, + customFeeRate: cfg.customFeeRate, + txLabeler: cfg.txLabeler, + customMuSig2Signer: cfg.customMuSig2Signer, + publishErrorHandler: cfg.publishErrorHandler, + presignedHelper: cfg.presignedHelper, + skippedTxns: cfg.skippedTxns, + immediatePublishThreshold: cfg.immediatePublishThreshold, } } @@ -1683,14 +1702,15 @@ func minimumSweepFeeRate(ctx context.Context, customFeeRate FeeRateProvider, // newBatchConfig creates new batch config. func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig { return batchConfig{ - maxTimeoutDistance: maxTimeoutDistance, - customFeeRate: b.customFeeRate, - txLabeler: b.txLabeler, - customMuSig2Signer: b.customMuSig2Signer, - presignedHelper: b.presignedHelper, - skippedTxns: b.skippedTxns, - clock: b.clock, - chainParams: b.chainParams, + maxTimeoutDistance: maxTimeoutDistance, + customFeeRate: b.customFeeRate, + txLabeler: b.txLabeler, + customMuSig2Signer: b.customMuSig2Signer, + presignedHelper: b.presignedHelper, + skippedTxns: b.skippedTxns, + clock: b.clock, + chainParams: b.chainParams, + immediatePublishThreshold: b.immediatePublishThreshold, } } diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index 2d35541b2..723dee6d4 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -5504,3 +5504,235 @@ func runTests(t *testing.T, testFn func(t *testing.T, store testStore, testFn(t, testStore, testBatcherStore) }) } + +// TestImmediatePublishThreshold tests that batches are published immediately +// when the cumulative value exceeds the configured threshold. +func TestImmediatePublishThreshold(t *testing.T) { + runTests(t, testImmediatePublishThreshold) +} + +func testImmediatePublishThreshold(t *testing.T, store testStore, + batcherStore testBatcherStore) { + + defer test.Guard(t, test.WithGuardTimeout(10*time.Second))() + + lnd := test.NewMockLnd() + ctx, cancel := context.WithCancel(context.Background()) + + sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams) + require.NoError(t, err) + + // Set up test clock and delays. + startTime := time.Date(2018, 11, 1, 0, 0, 0, 0, time.UTC) + tickSignal := make(chan time.Duration, 10) + testClock := clock.NewTestClockWithTickSignal(startTime, tickSignal) + + const ( + initialDelay = 10 * time.Second + publishDelay = 3 * time.Second + threshold = btcutil.Amount(500_000) // Set threshold at 5000 sats + ) + + initialDelayProvider := func(_ context.Context, _ int, + _ btcutil.Amount, fast bool) (time.Duration, error) { + + return initialDelay, nil + } + + // Create batcher with threshold option. + batcher := NewBatcher( + lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore, + WithInitialDelay(initialDelayProvider), + WithPublishDelay(publishDelay), + WithClock(testClock), + WithImmediatePublishThreshold(threshold), + ) + + var wg sync.WaitGroup + wg.Add(1) + + var runErr error + go func() { + defer wg.Done() + runErr = batcher.Run(ctx) + }() + + // Wait for the batcher to be initialized. + <-batcher.initDone + + // Scenario 1: Add sweep below threshold, should wait for initial + // delay. + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } + sweepReq1 := SweepRequest{ + SwapHash: lntypes.Hash{1, 1, 1}, + Inputs: []Input{{ + Value: 200_000, // Below threshold + Outpoint: op1, + }}, + Notifier: &dummyNotifier, + } + + swap1 := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + CltvExpiry: 1000, + AmountRequested: 200_000, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + + // Make preimage unique to pass SQL constraints. + Preimage: lntypes.Preimage{1}, + }, + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: confTarget, + } + + err = store.CreateLoopOut(ctx, sweepReq1.SwapHash, swap1) + require.NoError(t, err) + store.AssertLoopOutStored() + + // Deliver sweep request to batcher. + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Consume RegisterSpend and timer registrations in parallel since + // order is not deterministic. + var wg2 sync.WaitGroup + wg2.Add(1) + go func() { + defer wg2.Done() + <-lnd.RegisterSpendChannel + }() + + wg2.Add(1) + go func() { + defer wg2.Done() + <-tickSignal // publishDelay + <-tickSignal // initialDelay + }() + + // Wait for registrations to complete. + wg2.Wait() + + // Wait for batch to be created. + require.Eventually(t, func() bool { + return batcher.numBatches(ctx) == 1 + }, test.Timeout, eventuallyCheckFrequency) + + // Get the batch and setup logger to monitor messages. + batch1 := getOnlyBatch(t, ctx, batcher) + testLogger := &wrappedLogger{ + Logger: batch1.log(), + } + batch1.setLog(testLogger) + + // Advance clock to publishDelay - should not publish yet because + // initialDelay hasn't ended. + now := startTime.Add(publishDelay) + testClock.SetTime(now) + + // Wait for batch publishing to be skipped due to initialDelay. + require.EventuallyWithT(t, func(c *assert.CollectT) { + testLogger.mu.Lock() + defer testLogger.mu.Unlock() + + assert.Contains(c, testLogger.debugMessages, stillWaitingMsg) + }, test.Timeout, eventuallyCheckFrequency) + + // Scenario 2: Add another sweep that pushes total above threshold, + // should publish immediately. + op2 := wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 2, + } + sweepReq2 := SweepRequest{ + SwapHash: lntypes.Hash{2, 2, 2}, + Inputs: []Input{{ + Value: 400_000, // Total will be 600000 > threshold + Outpoint: op2, + }}, + Notifier: &dummyNotifier, + } + + swap2 := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + CltvExpiry: 1000, + AmountRequested: 400_000, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + + // Make preimage unique to pass SQL constraints. + Preimage: lntypes.Preimage{2}, + }, + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: confTarget, + } + + err = store.CreateLoopOut(ctx, sweepReq2.SwapHash, swap2) + require.NoError(t, err) + store.AssertLoopOutStored() + + // Deliver second sweep request to batcher. + require.NoError(t, batcher.AddSweep(ctx, &sweepReq2)) + + // Wait for sweep to be added to batch. + require.Eventually(t, func() bool { + return batch1.numSweeps(ctx) == 2 + }, test.Timeout, eventuallyCheckFrequency) + + // Check that threshold exceeded message was logged. + require.EventuallyWithT(t, func(c *assert.CollectT) { + testLogger.mu.Lock() + defer testLogger.mu.Unlock() + + found := false + expectedMsg := "Batch value (%v) exceeds threshold " + + "(%v), publishing immediately" + for _, msg := range testLogger.infoMessages { + if msg == expectedMsg { + found = true + break + } + } + assert.True(c, found, "expected threshold exceeded message") + }, test.Timeout, eventuallyCheckFrequency) + + // The threshold check will have triggered a publishDelay timer. + // Wait for it to be registered. + var publishDelayTimer time.Duration + select { + case publishDelayTimer = <-tickSignal: + require.Equal(t, publishDelay, publishDelayTimer) + case <-time.After(test.Timeout): + t.Fatal("expected publishDelay timer to be set") + } + + // Advance clock by publishDelay to trigger publishing. + testClock.SetTime(testClock.Now().Add(publishDelay)) + + // Wait for tx to be published - this proves immediate publishing + // worked without waiting for full initialDelay. + select { + case <-lnd.TxPublishChannel: + // Success - batch published immediately after threshold was + // exceeded. + case <-time.After(test.Timeout): + t.Fatal("expected batch to be published immediately") + } + + // Verify both sweeps are in the batch. + require.Eventually(t, func() bool { + return batcherStore.AssertSweepStored(op2) + }, test.Timeout, eventuallyCheckFrequency) + + // Clean up. + cancel() + wg.Wait() + + checkBatcherError(t, runErr) +}