Skip to content

Commit 24b104e

Browse files
feat: bulk ingestor WoC (#685)
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
1 parent 0a81550 commit 24b104e

19 files changed

+561
-144
lines changed

pkg/defs/chaintracks.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ type ChaintracksServiceConfig struct {
2424
Chain BSVNetwork `mapstructure:"-"`
2525
LiveIngestors []LiveIngestorType `mapstructure:"live_ingestors"`
2626
CDNBulkIngestors []CDNBulkIngestorConfig `mapstructure:"cdn_bulk_ingestors"`
27-
28-
// TODO: Specify API key for WoC ingestor
27+
WocAPIKey string `mapstructure:"woc_api_key"`
2928
}
3029

3130
// Validate checks if the Chain field in ChaintracksServiceConfig holds a valid BSV network type.
@@ -68,6 +67,7 @@ func DefaultChaintracksServiceConfig() ChaintracksServiceConfig {
6867
SourceURL: BabbageBlockHeadersCDN,
6968
},
7069
},
70+
WocAPIKey: "",
7171
}
7272
}
7373

pkg/defs/woc_poll_ingestor.go

Lines changed: 0 additions & 22 deletions
This file was deleted.

pkg/services/chaintracks/bulk_manager.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,16 @@ func newBulkManager(logger *slog.Logger, bulkIngestors []NamedBulkIngestor) *bul
3232
}
3333
}
3434

35-
func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges) (err error) {
35+
func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges) error {
3636
bm.logger.Info("Starting bulk synchronization", slog.Any("present_height", presentHeight), slog.Any("initial_ranges", initialRanges))
3737

38+
missingRange := models.NewHeightRange(0, presentHeight)
3839
for _, ingestor := range bm.bulkIngestors {
39-
bulkChunks, downloader, err := ingestor.Ingestor.Synchronize(ctx, presentHeight, initialRanges)
40+
if missingRange.IsEmpty() {
41+
break
42+
}
43+
44+
bulkChunks, downloader, err := ingestor.Ingestor.Synchronize(ctx, presentHeight, missingRange)
4045
if err != nil {
4146
bm.logger.Error("Chaintracks service - error during bulk synchronization", slog.String("ingestor_name", ingestor.Name), slog.String("error", err.Error()))
4247
return fmt.Errorf("bulk synchronization failed for ingestor %s: %w", ingestor.Name, err)
@@ -46,7 +51,18 @@ func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint,
4651
return fmt.Errorf("failed to process bulk chunks from ingestor %s: %w", ingestor.Name, err)
4752
}
4853

49-
// TODO: Implement DONE check and break if done
54+
providedRange := models.NewEmptyHeightRange()
55+
for _, chunk := range bulkChunks {
56+
providedRange, err = providedRange.Union(chunk.ToHeightRange())
57+
if err != nil {
58+
return fmt.Errorf("failed to compute provided height range from ingestor %s: %w", ingestor.Name, err)
59+
}
60+
}
61+
62+
missingRange, err = missingRange.Subtract(providedRange)
63+
if err != nil {
64+
return fmt.Errorf("failed to compute missing height range after ingestor %s: %w", ingestor.Name, err)
65+
}
5066
}
5167

5268
return nil

pkg/services/chaintracks/chaintracks_initializers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type Initializers struct {
1818
func DefaultInitializers() Initializers {
1919
return Initializers{
2020
WOCLiveIngestorPollFactory: func(logger *slog.Logger, config defs.ChaintracksServiceConfig) LiveIngestor {
21-
return ingest.NewLiveIngestorWocPoll(logger, defs.WOCPollIngestorConfig{Chain: config.Chain})
21+
return ingest.NewLiveIngestorWocPoll(logger, config.Chain, ingest.IngestorWocPollOpts.WithAPIKey(config.WocAPIKey))
2222
},
2323
CDNBulkIngestorFactory: func(logger *slog.Logger, chain defs.BSVNetwork, config defs.CDNBulkIngestorConfig) BulkIngestor {
2424
return ingest.NewBulkIngestorCDN(logger, chain, config)

pkg/services/chaintracks/chaintracks_service_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func TestService_GetPresentHeight(t *testing.T) {
6161
// and:
6262
service, err := chaintracks.NewService(logging.NewTestLogger(t), config, chaintracks.Initializers{
6363
WOCLiveIngestorPollFactory: func(logger *slog.Logger, config defs.ChaintracksServiceConfig) chaintracks.LiveIngestor {
64-
return ingest.NewLiveIngestorWocPoll(logger, defs.WOCPollIngestorConfig{Chain: config.Chain}, ingest.WithRestyClient(mockWOC.HttpClient()))
64+
return ingest.NewLiveIngestorWocPoll(logger, config.Chain, ingest.IngestorWocPollOpts.WithRestyClient(mockWOC.HttpClient()))
6565
},
6666
})
6767
require.NoError(t, err)

pkg/services/chaintracks/chaintracks_storage.interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type NamedLiveIngestor struct {
3636
// The Synchronize method ingests headers up to the given presentHeight for provided height ranges and returns insertion results.
3737
// TODO: refine return type from 'any' to a more specific type representing synchronization results.
3838
type BulkIngestor interface {
39-
Synchronize(ctx context.Context, presentHeight uint, ranges models.HeightRanges) ([]ingest.BulkHeaderFileInfo, ingest.BulkFileDownloader, error)
39+
Synchronize(ctx context.Context, presentHeight uint, rangeToFetch models.HeightRange) ([]ingest.BulkHeaderFileInfo, ingest.BulkFileDownloader, error)
4040
}
4141

4242
// NamedBulkIngestor associates a descriptive name with a BulkIngestor interface for organized bulk header synchronization tasks.

pkg/services/chaintracks/ingest/chaintracks_cdn_bulk_ingestor.go renamed to pkg/services/chaintracks/ingest/bulk_ingestor_cdn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type BulkFileDownloader = func(ctx context.Context, fileInfo BulkHeaderFileInfo)
3939

4040
// Synchronize retrieves available bulk header files for the configured BSV network and prepares chunks for ingestion.
4141
// It validates file metadata, checks network consistency, and returns a list of chunked header information for sync.
42-
func (b *BulkIngestorCDN) Synchronize(ctx context.Context, presentHeight uint, ranges models.HeightRanges) ([]BulkHeaderFileInfo, BulkFileDownloader, error) {
42+
func (b *BulkIngestorCDN) Synchronize(ctx context.Context, presentHeight uint, rangeToFetch models.HeightRange) ([]BulkHeaderFileInfo, BulkFileDownloader, error) {
4343
// TODO: PresentHeight and ranges are not used in TS implementation, consider using them for optimization
4444

4545
filesInfo, err := b.reader.FetchBulkHeaderFilesInfo(ctx, b.chain)
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package ingest
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"strings"
8+
"time"
9+
10+
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/defs"
11+
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/logging"
12+
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/models"
13+
"github.com/go-softwarelab/common/pkg/must"
14+
"github.com/go-softwarelab/common/pkg/to"
15+
)
16+
17+
// BulkIngestorWOC provides logic to ingest and synchronize block headers from WhatsOnChain bulk endpoints.
18+
// Utilizes a wocClient to fetch block headers and block height resources from the WhatsOnChain API service.
19+
// Maintains a logger for structured logging and a chain identifier for selecting network-specific resources.
20+
// Designed for efficient bulk fetching of header file metadata and incremental synchronization of chain state.
21+
type BulkIngestorWOC struct {
22+
logger *slog.Logger
23+
chain defs.BSVNetwork
24+
wocClient *wocClient
25+
}
26+
27+
// NewBulkIngestorWOC creates a new BulkIngestorWOC for a given logger, network, and optional configuration options.
28+
// It sets up a dedicated WhatsOnChain bulk client for the specified BSV network and uses the provided logger.
29+
// Optional configuration options allow customization such as API key or overriding the default HTTP client factory.
30+
// Returns a pointer to the BulkIngestorWOC which can efficiently ingest and synchronize block header files.
31+
func NewBulkIngestorWOC(logger *slog.Logger, chain defs.BSVNetwork, opts ...func(options *BulkIngestorWocOptions)) *BulkIngestorWOC {
32+
logger = logging.Child(logger, "bulk_ingestor_woc")
33+
34+
options := to.OptionsWithDefault(DefaultBulkIngestorWocOptions(), opts...)
35+
36+
return &BulkIngestorWOC{
37+
logger: logger,
38+
chain: chain,
39+
wocClient: newWocClient(logger, chain, options.APIKey, options.RestyClientFactory.New()),
40+
}
41+
}
42+
43+
// Synchronize fetches available bulk header files and selects those overlapping the specified height range.
44+
// Synchronize returns metadata for the required files and a downloader for retrieving their data from WhatsOnChain.
45+
// Synchronize returns an error if fetching or parsing file metadata fails, or if no appropriate files are found.
46+
func (b *BulkIngestorWOC) Synchronize(ctx context.Context, presentHeight uint, rangeToFetch models.HeightRange) ([]BulkHeaderFileInfo, BulkFileDownloader, error) {
47+
allFiles, err := b.fetchBulkHeaderFilesInfo(ctx)
48+
if err != nil {
49+
return nil, nil, fmt.Errorf("failed to fetch bulk header files info: %w", err)
50+
}
51+
52+
if len(allFiles) == 0 {
53+
return nil, nil, fmt.Errorf("no bulk header files available from WhatsOnChain")
54+
}
55+
56+
neededFiles := make([]wocBulkFileInfo, 0)
57+
for _, file := range allFiles {
58+
if file.heightRange.Overlaps(rangeToFetch) {
59+
neededFiles = append(neededFiles, file)
60+
}
61+
}
62+
63+
result := make([]BulkHeaderFileInfo, 0, len(neededFiles))
64+
for _, file := range neededFiles {
65+
bulkFileInfo, err := b.toBulkHeaderFileInfo(ctx, &file)
66+
if err != nil {
67+
return nil, nil, fmt.Errorf("failed to convert to BulkHeaderFileInfo for file %s: %w", file.filename, err)
68+
}
69+
70+
result = append(result, *bulkFileInfo)
71+
}
72+
73+
return result, b.bulkFileDownloader(), nil
74+
75+
}
76+
77+
func (b *BulkIngestorWOC) toBulkHeaderFileInfo(ctx context.Context, file *wocBulkFileInfo) (*BulkHeaderFileInfo, error) {
78+
prevChainWork := prevChainWorkForGenesis
79+
prevHash := genesisAsPrevBlockHash
80+
if file.heightRange.MinHeight > 0 {
81+
prevBlock, err := b.wocClient.GetBlockByHeight(ctx, file.heightRange.MinHeight-1)
82+
if err != nil {
83+
return nil, fmt.Errorf("failed to get previous block at height %d: %w", file.heightRange.MinHeight-1, err)
84+
}
85+
86+
prevChainWork = prevBlock.Chainwork
87+
prevHash = prevBlock.Hash
88+
}
89+
90+
lastBlock, err := b.wocClient.GetBlockByHeight(ctx, file.heightRange.MaxHeight)
91+
if err != nil {
92+
return nil, fmt.Errorf("failed to get last block at height %d: %w", file.heightRange.MaxHeight, err)
93+
}
94+
95+
return &BulkHeaderFileInfo{
96+
FileName: fmt.Sprintf("%d_%d_headers.bin", file.heightRange.MinHeight, file.heightRange.MaxHeight),
97+
FirstHeight: file.heightRange.MinHeight,
98+
Count: must.ConvertToIntFromUnsigned(file.heightRange.MaxHeight) - must.ConvertToIntFromUnsigned(file.heightRange.MinHeight) + 1,
99+
Chain: b.chain,
100+
SourceURL: to.Ptr(file.url),
101+
102+
PrevChainWork: prevChainWork,
103+
PrevHash: prevHash,
104+
105+
LastChainWork: lastBlock.Chainwork,
106+
LastHash: &lastBlock.Hash,
107+
108+
// Not supported, we don't download the file at this point and WoC doesn't provide it in metadata
109+
FileHash: nil,
110+
}, nil
111+
}
112+
113+
func (b *BulkIngestorWOC) bulkFileDownloader() BulkFileDownloader {
114+
return func(ctx context.Context, fileInfo BulkHeaderFileInfo) (BulkFileData, error) {
115+
if fileInfo.SourceURL == nil {
116+
panic("SourceURL is nil in bulk file downloader")
117+
}
118+
119+
b.logger.Info("Downloading bulk header file", slog.String("file_name", fileInfo.FileName))
120+
121+
content, err := b.wocClient.DownloadHeaderFile(ctx, *fileInfo.SourceURL)
122+
if err != nil {
123+
return BulkFileData{}, fmt.Errorf("failed to download bulk header file %s: %w", fileInfo.FileName, err)
124+
}
125+
126+
return BulkFileData{
127+
Info: fileInfo,
128+
Data: content,
129+
AccessedAt: time.Now(),
130+
}, nil
131+
}
132+
}
133+
134+
type wocBulkFileInfo struct {
135+
heightRange models.HeightRange
136+
url string
137+
filename string
138+
}
139+
140+
func (b *BulkIngestorWOC) fetchBulkHeaderFilesInfo(ctx context.Context) ([]wocBulkFileInfo, error) {
141+
response, err := b.wocClient.GetHeadersResourceList(ctx)
142+
if err != nil {
143+
return nil, fmt.Errorf("failed to get headers resource list from WhatsOnChain: %w", err)
144+
}
145+
146+
result := make([]wocBulkFileInfo, 0, len(response.Files))
147+
for _, fileURL := range response.Files {
148+
filename, heightRange, err := b.parseURL(ctx, fileURL)
149+
if err != nil {
150+
return nil, fmt.Errorf("failed to parse height range from URL %s: %w", fileURL, err)
151+
}
152+
153+
result = append(result, wocBulkFileInfo{
154+
heightRange: heightRange,
155+
url: fileURL,
156+
filename: filename,
157+
})
158+
}
159+
160+
return result, nil
161+
}
162+
163+
// parseURL parses the height range from the given WhatsOnChain bulk header file URL.
164+
// "https://api.whatsonchain.com/v1/bsv/main/block/headers/0_10000_headers.bin",
165+
// "https://api.whatsonchain.com/v1/bsv/main/block/headers/10001_20000_headers.bin",
166+
// (...)
167+
// "https://api.whatsonchain.com/v1/bsv/main/block/headers/latest"
168+
// The latest endpoint - we don't know the max height by URL alone; the min height is previous max + 1
169+
// So we need to get the Content-Disposition header from the HEAD request to get the actual filename
170+
func (b *BulkIngestorWOC) parseURL(ctx context.Context, url string) (filename string, heightRange models.HeightRange, err error) {
171+
parts := strings.Split(url, "/block/headers/")
172+
if len(parts) != 2 {
173+
err = fmt.Errorf("invalid URL format: %s", url)
174+
return
175+
}
176+
filename = parts[1]
177+
178+
if filename == "latest" {
179+
filename, err = b.getLatestHeightRange(ctx, url)
180+
if err != nil {
181+
err = fmt.Errorf("failed to get latest height range from URL %s: %w", url, err)
182+
return
183+
}
184+
}
185+
186+
_, err = fmt.Sscanf(filename, "%d_%d_headers.bin", &heightRange.MinHeight, &heightRange.MaxHeight)
187+
if err != nil {
188+
err = fmt.Errorf("failed to parse height range from filename %s: %w", filename, err)
189+
return
190+
}
191+
192+
return
193+
}
194+
195+
// getLatestHeightRange performs a HEAD request to the given latest URL to retrieve the Content-Disposition header.
196+
// It extracts the filename from the header to determine the actual height range of the latest bulk header
197+
func (b *BulkIngestorWOC) getLatestHeightRange(ctx context.Context, latestURL string) (string, error) {
198+
contentHeader, err := b.wocClient.GetContentDispositionFilename(ctx, latestURL)
199+
if err != nil {
200+
return "", fmt.Errorf("failed to get Content-Disposition header from WhatsOnChain: %w", err)
201+
}
202+
203+
// example: Content-Disposition: attachment; filename=922001_923532_headers.bin
204+
var filename string
205+
if _, err = fmt.Sscanf(contentHeader, "attachment; filename=%s", &filename); err != nil {
206+
return "", fmt.Errorf("failed to parse filename from Content-Disposition header: %w", err)
207+
}
208+
209+
return filename, nil
210+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package ingest
2+
3+
import (
4+
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/internal/httpx"
5+
"github.com/go-resty/resty/v2"
6+
)
7+
8+
// BulkIngestorWocOptions provides configuration for bulk ingestion using a configurable HTTP client and optional API key.
9+
type BulkIngestorWocOptions struct {
10+
RestyClientFactory *httpx.RestyClientFactory
11+
APIKey string
12+
}
13+
14+
// DefaultBulkIngestorWocOptions returns the default BulkIngestorWocOptions with a configured RestyClientFactory.
15+
func DefaultBulkIngestorWocOptions() BulkIngestorWocOptions {
16+
return BulkIngestorWocOptions{
17+
RestyClientFactory: httpx.NewRestyClientFactory(),
18+
}
19+
}
20+
21+
// BulkIngestorWocOptionsBuilder provides builder methods to configure BulkIngestorWocOptions for bulk ingestion.
22+
type BulkIngestorWocOptionsBuilder struct{}
23+
24+
// BulkIngestorWocOpts provides option builder methods for customizing BulkIngestorWocOptions configuration.
25+
var BulkIngestorWocOpts BulkIngestorWocOptionsBuilder
26+
27+
// WithRestyClient sets a custom resty.Client to be used for HTTP requests in BulkIngestorWocOptions.
28+
// It overrides the default RestyClientFactory with one based on the provided client instance.
29+
func (BulkIngestorWocOptionsBuilder) WithRestyClient(client *resty.Client) func(*BulkIngestorWocOptions) {
30+
return func(options *BulkIngestorWocOptions) {
31+
options.RestyClientFactory = httpx.NewRestyClientFactoryWithBase(client)
32+
}
33+
}
34+
35+
// WithAPIKey sets the API key to be used with the BulkIngestorWocOptions instance.
36+
func (BulkIngestorWocOptionsBuilder) WithAPIKey(apiKey string) func(*BulkIngestorWocOptions) {
37+
return func(options *BulkIngestorWocOptions) {
38+
options.APIKey = apiKey
39+
}
40+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package ingest
2+
3+
import (
4+
"testing"
5+
6+
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/defs"
7+
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/logging"
8+
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/models"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestBulkIngestorWOC_Synchronize(t *testing.T) {
13+
t.Skip("This test gets actual data from WOC - use this only for manual testing purposes")
14+
service := NewBulkIngestorWOC(logging.NewTestLogger(t), defs.NetworkMainnet)
15+
16+
presentHeight := uint(923537)
17+
rangeToLoad := presentHeight - 4000
18+
fileInfo, _, err := service.Synchronize(t.Context(), presentHeight, models.NewHeightRange(rangeToLoad, presentHeight))
19+
20+
require.NoError(t, err)
21+
t.Logf("Fetched file info: %+v", fileInfo)
22+
}

0 commit comments

Comments
 (0)