Skip to content

Commit 1859c1f

Browse files
committed
fix: resolve GASP dependency resolution and concurrency issues
- Fix ProvideForeignGASPNode to use outpoint parameter instead of graphId for correct dependency lookup - Add global deduplication cache to prevent duplicate processing across UTXOs - Add network-level flow control in OverlayGASPRemote for RequestNode calls - Add InFlight deduplication in OverlayGASPRemote to prevent duplicate network requests - Convert sharedOutpoints from map to sync.Map for concurrent access safety
1 parent 0bf1f7f commit 1859c1f

File tree

3 files changed

+102
-40
lines changed

3 files changed

+102
-40
lines changed

pkg/core/engine/engine.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -660,11 +660,7 @@ func (e *Engine) StartGASPSync(ctx context.Context) error {
660660
// Create a new GASP provider for each peer to avoid state conflicts
661661
gaspProvider := gasp.NewGASP(gasp.GASPParams{
662662
Storage: NewOverlayGASPStorage(topic, e, nil),
663-
Remote: &OverlayGASPRemote{
664-
EndpointUrl: peer,
665-
Topic: topic,
666-
HttpClient: http.DefaultClient,
667-
},
663+
Remote: NewOverlayGASPRemote(peer, topic, http.DefaultClient, 8),
668664
LastInteraction: lastInteraction,
669665
LogPrefix: &logPrefix,
670666
Unidirectional: true,
@@ -731,7 +727,7 @@ func (e *Engine) ProvideForeignGASPNode(ctx context.Context, graphId *transactio
731727
return nil, err
732728
} else {
733729
node := &gasp.Node{
734-
GraphID: graphId,
730+
GraphID: outpoint,
735731
RawTx: tx.Hex(),
736732
OutputIndex: outpoint.Index,
737733
AncillaryBeef: output.AncillaryBeef,
@@ -745,8 +741,8 @@ func (e *Engine) ProvideForeignGASPNode(ctx context.Context, graphId *transactio
745741
}
746742

747743
}
748-
if output, err := e.Storage.FindOutput(ctx, graphId, &topic, nil, true); err != nil {
749-
slog.Error("failed to find output in ProvideForeignGASPNode", "graphId", graphId.String(), "topic", topic, "error", err)
744+
if output, err := e.Storage.FindOutput(ctx, outpoint, &topic, nil, true); err != nil {
745+
slog.Error("failed to find output in ProvideForeignGASPNode", "outpoint", outpoint.String(), "topic", topic, "error", err)
750746
return nil, err
751747
} else if output == nil {
752748
return nil, ErrMissingOutput

pkg/core/engine/gasp-remote.go

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,30 +8,52 @@ import (
88
"io"
99
"log/slog"
1010
"net/http"
11+
"sync"
1112

1213
"github.com/bsv-blockchain/go-overlay-services/pkg/core/gasp"
1314
"github.com/bsv-blockchain/go-sdk/transaction"
1415
"github.com/bsv-blockchain/go-sdk/util"
1516
)
1617

18+
type inflightNodeRequest struct {
19+
wg *sync.WaitGroup
20+
result *gasp.Node
21+
err error
22+
}
23+
1724
type OverlayGASPRemote struct {
18-
EndpointUrl string
19-
Topic string
20-
HttpClient util.HTTPClient
25+
endpointUrl string
26+
topic string
27+
httpClient util.HTTPClient
28+
inflightMap sync.Map // Map to track in-flight node requests
29+
networkLimiter chan struct{} // Controls max concurrent network requests
30+
}
31+
32+
func NewOverlayGASPRemote(endpointUrl, topic string, httpClient util.HTTPClient, maxConcurrency int) *OverlayGASPRemote {
33+
if maxConcurrency <= 0 {
34+
maxConcurrency = 8 // Default network concurrency
35+
}
36+
37+
return &OverlayGASPRemote{
38+
endpointUrl: endpointUrl,
39+
topic: topic,
40+
httpClient: httpClient,
41+
networkLimiter: make(chan struct{}, maxConcurrency),
42+
}
2143
}
2244

2345
func (r *OverlayGASPRemote) GetInitialResponse(ctx context.Context, request *gasp.InitialRequest) (*gasp.InitialResponse, error) {
2446
var buf bytes.Buffer
2547
if err := json.NewEncoder(&buf).Encode(request); err != nil {
26-
slog.Error("failed to encode GASP initial request", "endpoint", r.EndpointUrl, "topic", r.Topic, "error", err)
48+
slog.Error("failed to encode GASP initial request", "endpoint", r.endpointUrl, "topic", r.topic, "error", err)
2749
return nil, err
28-
} else if req, err := http.NewRequest("POST", r.EndpointUrl+"/requestSyncResponse", io.NopCloser(&buf)); err != nil {
29-
slog.Error("failed to create HTTP request for GASP initial response", "endpoint", r.EndpointUrl, "topic", r.Topic, "error", err)
50+
} else if req, err := http.NewRequest("POST", r.endpointUrl+"/requestSyncResponse", io.NopCloser(&buf)); err != nil {
51+
slog.Error("failed to create HTTP request for GASP initial response", "endpoint", r.endpointUrl, "topic", r.topic, "error", err)
3052
return nil, err
3153
} else {
3254
req.Header.Set("Content-Type", "application/json")
33-
req.Header.Set("X-BSV-Topic", r.Topic)
34-
if resp, err := r.HttpClient.Do(req); err != nil {
55+
req.Header.Set("X-BSV-Topic", r.topic)
56+
if resp, err := r.httpClient.Do(req); err != nil {
3557
return nil, err
3658
} else {
3759
defer func() { _ = resp.Body.Close() }()
@@ -51,19 +73,48 @@ func (r *OverlayGASPRemote) GetInitialResponse(ctx context.Context, request *gas
5173
}
5274

5375
func (r *OverlayGASPRemote) RequestNode(ctx context.Context, graphID *transaction.Outpoint, outpoint *transaction.Outpoint, metadata bool) (*gasp.Node, error) {
76+
outpointStr := outpoint.String()
77+
var wg sync.WaitGroup
78+
wg.Add(1)
79+
defer wg.Done()
80+
81+
// Check if there's already an in-flight request for this outpoint
82+
if inflight, loaded := r.inflightMap.LoadOrStore(outpointStr, &inflightNodeRequest{wg: &wg}); loaded {
83+
req := inflight.(*inflightNodeRequest)
84+
req.wg.Wait()
85+
return req.result, req.err
86+
} else {
87+
req := inflight.(*inflightNodeRequest)
88+
slog.Info("Requesting GASP node", "outpoint", outpointStr, "metadata", metadata, "topic", r.topic)
89+
req.result, req.err = r.doNodeRequest(ctx, graphID, outpoint, metadata)
90+
91+
// Clean up inflight map
92+
r.inflightMap.Delete(outpointStr)
93+
return req.result, req.err
94+
}
95+
}
96+
97+
func (r *OverlayGASPRemote) doNodeRequest(ctx context.Context, graphID *transaction.Outpoint, outpoint *transaction.Outpoint, metadata bool) (*gasp.Node, error) {
98+
// Acquire network limiter
99+
select {
100+
case r.networkLimiter <- struct{}{}:
101+
case <-ctx.Done():
102+
return nil, ctx.Err()
103+
}
104+
defer func() { <-r.networkLimiter }()
54105
if j, err := json.Marshal(&gasp.NodeRequest{
55106
GraphID: graphID,
56107
Txid: &outpoint.Txid,
57108
OutputIndex: outpoint.Index,
58109
Metadata: metadata,
59110
}); err != nil {
60111
return nil, err
61-
} else if req, err := http.NewRequest("POST", r.EndpointUrl+"/requestForeignGASPNode", bytes.NewReader(j)); err != nil {
112+
} else if req, err := http.NewRequest("POST", r.endpointUrl+"/requestForeignGASPNode", bytes.NewReader(j)); err != nil {
62113
return nil, err
63114
} else {
64115
req.Header.Set("Content-Type", "application/json")
65-
req.Header.Set("X-BSV-Topic", r.Topic)
66-
if resp, err := r.HttpClient.Do(req); err != nil {
116+
req.Header.Set("X-BSV-Topic", r.topic)
117+
if resp, err := r.httpClient.Do(req); err != nil {
67118
return nil, err
68119
} else {
69120
defer func() { _ = resp.Body.Close() }()

pkg/core/gasp/gasp.go

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,13 @@ type GASP struct {
3939
LogPrefix string
4040
Unidirectional bool
4141
LogLevel slog.Level
42-
limiter chan struct{}
42+
limiter chan struct{} // Concurrency limiter controlled by Concurrency config
43+
44+
// Global deduplication cache for processed nodes across all UTXOs
45+
processedNodes sync.Map // map[transaction.Outpoint]struct{} - prevents duplicate processing
4346
}
4447

48+
4549
func NewGASP(params GASPParams) *GASP {
4650
gasp := &GASP{
4751
Storage: params.Storage,
@@ -50,6 +54,7 @@ func NewGASP(params GASPParams) *GASP {
5054
Unidirectional: params.Unidirectional,
5155
// Sequential: params.Sequential,
5256
}
57+
// Concurrency limiter controlled by Concurrency config
5358
if params.Concurrency > 1 {
5459
gasp.limiter = make(chan struct{}, params.Concurrency)
5560
} else {
@@ -83,7 +88,7 @@ func (g *GASP) Sync(ctx context.Context, host string, limit uint32) error {
8388
outpoint := fmt.Sprintf("%s.%d", utxo.Txid, utxo.OutputIndex)
8489
knownOutpoints[outpoint] = struct{}{}
8590
}
86-
sharedOutpoints := make(map[string]struct{})
91+
var sharedOutpoints sync.Map
8792

8893
var initialResponse *InitialResponse
8994
for {
@@ -104,14 +109,17 @@ func (g *GASP) Sync(ctx context.Context, host string, limit uint32) error {
104109
}
105110
outpoint := utxo.OutpointString()
106111
if _, exists := knownOutpoints[outpoint]; exists {
107-
sharedOutpoints[outpoint] = struct{}{}
112+
sharedOutpoints.Store(outpoint, struct{}{})
108113
delete(knownOutpoints, outpoint)
109-
} else if _, shared := sharedOutpoints[outpoint]; !shared {
114+
} else if _, shared := sharedOutpoints.Load(outpoint); !shared {
110115
ingestQueue = append(ingestQueue, utxo)
111116
}
112117
}
113118

119+
// Process all UTXOs from this batch with shared deduplication
114120
var wg sync.WaitGroup
121+
seenNodes := &sync.Map{} // Shared across all UTXOs in this batch
122+
115123
for _, utxo := range ingestQueue {
116124
wg.Add(1)
117125
g.limiter <- struct{}{}
@@ -127,15 +135,15 @@ func (g *GASP) Sync(ctx context.Context, host string, limit uint32) error {
127135
return
128136
}
129137
slog.Debug(fmt.Sprintf("%sReceived unspent graph node from remote: %v", g.LogPrefix, resolvedNode))
130-
if err = g.processIncomingNode(ctx, resolvedNode, nil, &sync.Map{}); err != nil {
138+
if err = g.processIncomingNode(ctx, resolvedNode, nil, seenNodes); err != nil {
131139
slog.Warn(fmt.Sprintf("%sError processing incoming node %s: %v", g.LogPrefix, outpoint, err))
132140
return
133141
}
134142
if err = g.CompleteGraph(ctx, resolvedNode.GraphID); err != nil {
135143
slog.Warn(fmt.Sprintf("%sError completing graph for %s: %v", g.LogPrefix, outpoint, err))
136144
return
137145
}
138-
sharedOutpoints[outpoint.String()] = struct{}{}
146+
sharedOutpoints.Store(outpoint.String(), struct{}{})
139147
}(utxo)
140148
}
141149
wg.Wait()
@@ -153,7 +161,7 @@ func (g *GASP) Sync(ctx context.Context, host string, limit uint32) error {
153161
for _, utxo := range localUTXOs {
154162
outpoint := fmt.Sprintf("%s.%d", utxo.Txid, utxo.OutputIndex)
155163
if utxo.Score >= initialResponse.Since {
156-
if _, shared := sharedOutpoints[outpoint]; !shared {
164+
if _, shared := sharedOutpoints.Load(outpoint); !shared {
157165
replyUTXOs = append(replyUTXOs, utxo)
158166
}
159167
}
@@ -280,16 +288,27 @@ func (g *GASP) processIncomingNode(ctx context.Context, node *Node, spentBy *tra
280288
if txid, err := g.computeTxID(node.RawTx); err != nil {
281289
return err
282290
} else {
283-
nodeId := (&transaction.Outpoint{
291+
nodeOutpoint := &transaction.Outpoint{
284292
Txid: *txid,
285293
Index: node.OutputIndex,
286-
}).String()
294+
}
295+
nodeId := nodeOutpoint.String()
296+
287297
slog.Debug(fmt.Sprintf("%sProcessing incoming node: %v, spentBy: %v", g.LogPrefix, node, spentBy))
298+
299+
// Global deduplication check
300+
if _, exists := g.processedNodes.LoadOrStore(*nodeOutpoint, struct{}{}); exists {
301+
slog.Debug(fmt.Sprintf("%sNode %s already processed globally, skipping", g.LogPrefix, nodeId))
302+
return nil
303+
}
304+
305+
// Per-graph cycle detection
288306
if _, ok := seenNodes.Load(nodeId); ok {
289-
slog.Debug(fmt.Sprintf("%sNode %s already processed, skipping.", g.LogPrefix, nodeId))
307+
slog.Debug(fmt.Sprintf("%sNode %s already seen in this graph, skipping.", g.LogPrefix, nodeId))
290308
return nil
291309
}
292310
seenNodes.Store(nodeId, struct{}{})
311+
293312
if err := g.Storage.AppendToGraph(ctx, node, spentBy); err != nil {
294313
return err
295314
} else if neededInputs, err := g.Storage.FindNeededInputs(ctx, node); err != nil {
@@ -300,18 +319,16 @@ func (g *GASP) processIncomingNode(ctx context.Context, node *Node, spentBy *tra
300319
errors := make(chan error)
301320
for outpointStr, data := range neededInputs.RequestedInputs {
302321
wg.Add(1)
303-
g.limiter <- struct{}{}
304322
go func(outpointStr string, data *NodeResponseData) {
305-
defer func() {
306-
<-g.limiter
307-
wg.Done()
308-
}()
323+
defer wg.Done()
324+
309325
slog.Info(fmt.Sprintf("%sRequesting new node for outpoint: %s, metadata: %v", g.LogPrefix, outpointStr, data.Metadata))
310326
if outpoint, err := transaction.OutpointFromString(outpointStr); err != nil {
311327
errors <- err
312-
} else if newNode, err := g.Remote.RequestNode(ctx, node.GraphID, outpoint, data.Metadata); err != nil {
328+
} else if newNode, err := g.Remote.RequestNode(ctx, outpoint, outpoint, data.Metadata); err != nil {
313329
errors <- err
314330
} else {
331+
315332
slog.Debug(fmt.Sprintf("%sReceived new node: %v", g.LogPrefix, newNode))
316333
// Create outpoint for the current node that is spending this input
317334
spendingOutpoint := &transaction.Outpoint{
@@ -365,12 +382,8 @@ func (g *GASP) processOutgoingNode(ctx context.Context, node *Node, seenNodes *s
365382
var wg sync.WaitGroup
366383
for outpointStr, data := range response.RequestedInputs {
367384
wg.Add(1)
368-
g.limiter <- struct{}{}
369385
go func(outpointStr string, data *NodeResponseData) {
370-
defer func() {
371-
<-g.limiter
372-
wg.Done()
373-
}()
386+
defer wg.Done()
374387
var outpoint *transaction.Outpoint
375388
var err error
376389
if outpoint, err = transaction.OutpointFromString(outpointStr); err == nil {
@@ -392,6 +405,8 @@ func (g *GASP) processOutgoingNode(ctx context.Context, node *Node, seenNodes *s
392405
return nil
393406
}
394407

408+
409+
395410
func (g *GASP) computeTxID(rawtx string) (*chainhash.Hash, error) {
396411
if tx, err := transaction.NewTransactionFromHex(rawtx); err != nil {
397412
return nil, err

0 commit comments

Comments
 (0)