Skip to content

Commit 075d0ed

Browse files
feat: support multiple bulk ingestor types and add methods to get chain tip (#686)
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
1 parent 24b104e commit 075d0ed

File tree

8 files changed

+143
-25
lines changed

8 files changed

+143
-25
lines changed

pkg/defs/chaintracks.go

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ func ParseLiveIngestorType(str string) (LiveIngestorType, error) {
2121

2222
// ChaintracksServiceConfig holds configuration for Chaintracks service, including the BSV network selection.
2323
type ChaintracksServiceConfig struct {
24-
Chain BSVNetwork `mapstructure:"-"`
25-
LiveIngestors []LiveIngestorType `mapstructure:"live_ingestors"`
26-
CDNBulkIngestors []CDNBulkIngestorConfig `mapstructure:"cdn_bulk_ingestors"`
27-
WocAPIKey string `mapstructure:"woc_api_key"`
24+
Chain BSVNetwork `mapstructure:"-"`
25+
LiveIngestors []LiveIngestorType `mapstructure:"live_ingestors"`
26+
BulkIngestors []BulkIngestorConfig `mapstructure:"bulk_ingestors"`
27+
WocAPIKey string `mapstructure:"woc_api_key"`
2828
}
2929

3030
// Validate checks if the Chain field in ChaintracksServiceConfig holds a valid BSV network type.
@@ -46,9 +46,9 @@ func (c *ChaintracksServiceConfig) Validate() error {
4646
}
4747
}
4848

49-
for i := range c.CDNBulkIngestors {
50-
if err := c.CDNBulkIngestors[i].Validate(); err != nil {
51-
return fmt.Errorf("invalid CDN bulk ingestor config at index %d: %w", i, err)
49+
for i := range c.BulkIngestors {
50+
if err := c.BulkIngestors[i].Validate(); err != nil {
51+
return fmt.Errorf("invalid bulk ingestor config at index %d: %w", i, err)
5252
}
5353
}
5454

@@ -62,15 +62,66 @@ func DefaultChaintracksServiceConfig() ChaintracksServiceConfig {
6262
LiveIngestors: []LiveIngestorType{
6363
LiveIngestorTypeWocPoll,
6464
},
65-
CDNBulkIngestors: []CDNBulkIngestorConfig{
65+
BulkIngestors: []BulkIngestorConfig{
6666
{
67-
SourceURL: BabbageBlockHeadersCDN,
67+
Type: ChaintracksCDN,
68+
CDNConfig: &CDNBulkIngestorConfig{
69+
SourceURL: BabbageBlockHeadersCDN,
70+
},
71+
},
72+
{
73+
Type: WhatsOnChainCDN,
6874
},
6975
},
7076
WocAPIKey: "",
7177
}
7278
}
7379

80+
// BulkIngestorType defines the type of bulk ingestor used to retrieve blockchain data from different sources.
81+
type BulkIngestorType string
82+
83+
// Supported BulkIngestorType values.
84+
const (
85+
ChaintracksCDN BulkIngestorType = "chaintracks_cdn"
86+
WhatsOnChainCDN BulkIngestorType = "whats_on_chain_cdn"
87+
)
88+
89+
// BulkIngestorConfig defines configuration for a bulk ingestor used to fetch blockchain data such as block headers.
90+
type BulkIngestorConfig struct {
91+
Type BulkIngestorType `mapstructure:"type"`
92+
93+
// CDNConfig is required if Type is ChaintracksCDN
94+
CDNConfig *CDNBulkIngestorConfig `mapstructure:"cdn_config"`
95+
}
96+
97+
// Validate checks if the BulkIngestorConfig is properly configured for the specified Type and returns an error if not valid.
98+
func (b *BulkIngestorConfig) Validate() error {
99+
switch b.Type {
100+
case ChaintracksCDN:
101+
if b.CDNConfig == nil {
102+
return fmt.Errorf("cdn_config is required for chaintracks_cdn bulk ingestor")
103+
}
104+
if err := b.CDNConfig.Validate(); err != nil {
105+
return fmt.Errorf("invalid cdn_config: %w", err)
106+
}
107+
case WhatsOnChainCDN:
108+
// No additional config required for WhatsOnChainCDN
109+
default:
110+
return fmt.Errorf("invalid bulk ingestor type: %s", b.Type)
111+
}
112+
return nil
113+
}
114+
115+
// String returns a human-readable representation of the BulkIngestorConfig, including type and CDN source URL when applicable.
116+
func (b *BulkIngestorConfig) String() string {
117+
detailed := ""
118+
if b.Type == ChaintracksCDN && b.CDNConfig != nil {
119+
detailed = fmt.Sprintf(" (source_url=%s)", b.CDNConfig.SourceURL)
120+
}
121+
122+
return fmt.Sprintf("%s%s", b.Type, detailed)
123+
}
124+
74125
// CDNBulkIngestorConfig holds configuration options for a bulk ingestor that fetches block headers from a CDN source.
75126
type CDNBulkIngestorConfig struct {
76127
SourceURL string `mapstructure:"source_url"`

pkg/services/chaintracks/chaintracks_initializers.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
type Initializers struct {
1313
WOCLiveIngestorPollFactory func(logger *slog.Logger, config defs.ChaintracksServiceConfig) LiveIngestor
1414
CDNBulkIngestorFactory func(logger *slog.Logger, chain defs.BSVNetwork, config defs.CDNBulkIngestorConfig) BulkIngestor
15+
WOCBulkIngestorFactory func(logger *slog.Logger, chain defs.BSVNetwork, apiKey string) BulkIngestor
1516
}
1617

1718
// DefaultInitializers returns an Initializers struct with the default WOCLiveIngestorPollFactory implementation.
@@ -23,6 +24,9 @@ func DefaultInitializers() Initializers {
2324
CDNBulkIngestorFactory: func(logger *slog.Logger, chain defs.BSVNetwork, config defs.CDNBulkIngestorConfig) BulkIngestor {
2425
return ingest.NewBulkIngestorCDN(logger, chain, config)
2526
},
27+
WOCBulkIngestorFactory: func(logger *slog.Logger, chain defs.BSVNetwork, apiKey string) BulkIngestor {
28+
return ingest.NewBulkIngestorWOC(logger, chain, ingest.BulkIngestorWocOpts.WithAPIKey(apiKey))
29+
},
2630
}
2731
}
2832

@@ -33,6 +37,12 @@ func createInitializers(inits ...Initializers) Initializers {
3337
if in.WOCLiveIngestorPollFactory != nil {
3438
finalInits.WOCLiveIngestorPollFactory = in.WOCLiveIngestorPollFactory
3539
}
40+
if in.CDNBulkIngestorFactory != nil {
41+
finalInits.CDNBulkIngestorFactory = in.CDNBulkIngestorFactory
42+
}
43+
if in.WOCBulkIngestorFactory != nil {
44+
finalInits.WOCBulkIngestorFactory = in.WOCBulkIngestorFactory
45+
}
3646
}
3747

3848
return finalInits

pkg/services/chaintracks/chaintracks_service.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,28 @@ func (s *Service) GetInfo(ctx context.Context) (*models.InfoResponse, error) {
205205
}, nil
206206
}
207207

208+
// FindChainTipHeader retrieves the current chain tip block header from storage or returns an error if not found.
209+
func (s *Service) FindChainTipHeader(ctx context.Context) (*wdk.ChainBlockHeader, error) {
210+
tipHeader, err := s.storage.Query(ctx).GetActiveTipLiveHeader()
211+
if err != nil {
212+
return nil, fmt.Errorf("failed to get active tip live header: %w", err)
213+
}
214+
if tipHeader == nil {
215+
return nil, fmt.Errorf("no active tip live header found: %w", wdk.ErrNotFoundError)
216+
}
217+
return &tipHeader.ChainBlockHeader, nil
218+
}
219+
220+
// FindChainTipHash retrieves the hash of the current chain tip.
221+
// Returns the hash as a string or an error if the chain tip header cannot be found.
222+
func (s *Service) FindChainTipHash(ctx context.Context) (string, error) {
223+
if tipHeader, err := s.FindChainTipHeader(ctx); err != nil {
224+
return "", fmt.Errorf("failed to find chain tip header: %w", err)
225+
} else {
226+
return tipHeader.Hash, nil
227+
}
228+
}
229+
208230
func (s *Service) getMissingBlockHeader(ctx context.Context, hash string) *wdk.ChainBlockHeader {
209231
for _, liveIngestor := range s.liveIngestors {
210232
header, err := liveIngestor.Ingestor.GetHeaderByHash(ctx, hash)

pkg/services/chaintracks/chaintracks_service_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,25 @@ func TestService_Lifecycle(t *testing.T) {
3737
assert.Equal(t, defs.NetworkMainnet, info.Chain)
3838
assert.Equal(t, "gorm-sqlite-inmemory", info.Storage)
3939
assert.Equal(t, []string{"woc_poll"}, info.LiveIngestors)
40-
assert.Equal(t, []string{"https://cdn.projectbabbage.com/blockheaders"}, info.BulkIngestors)
41-
//assert.Equal(t, int(info.HeightLive)-1, int(info.HeightBulk)) // TODO: Uncomment it when WoC Bulk ingestor is implemented (Babbage CDN ingestor doesn't provide the latest data)
40+
assert.Equal(t, []string{"chaintracks_cdn (source_url=https://cdn.projectbabbage.com/blockheaders)", "whats_on_chain_cdn"}, info.BulkIngestors)
4241
t.Logf("Bulk height: %d", info.HeightBulk) // TODO: When we use live data (not mocked), this value is not constant; will be changed to assert.Equal later
4342
t.Logf("Live height: %d", info.HeightLive)
4443

44+
// when:
45+
height, err := service.GetPresentHeight(t.Context())
46+
47+
// then:
48+
require.NoError(t, err, "get present height should not return error")
49+
require.Greater(t, height, uint(900000), "present height should be greater than 900000")
50+
51+
// when:
52+
tipHeader, err := service.FindChainTipHeader(t.Context())
53+
54+
// then:
55+
require.NoError(t, err, "find chain tip header should not return error")
56+
require.NotNil(t, tipHeader, "tip header should not be nil")
57+
require.Equal(t, height, tipHeader.Height, "tip header height should be equal to present height")
58+
4559
// when:
4660
service.Destroy()
4761

pkg/services/chaintracks/create_ingestors.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,26 @@ func createLiveIngestors(logger *slog.Logger, config defs.ChaintracksServiceConf
4040
}
4141

4242
func createBulkIngestors(logger *slog.Logger, config defs.ChaintracksServiceConfig, initializers Initializers) []NamedBulkIngestor {
43-
logger.Info("Chaintracks service - creating bulk ingestors", slog.Any("configured_sources", config.CDNBulkIngestors))
44-
45-
ingestors := make([]NamedBulkIngestor, 0, len(config.CDNBulkIngestors))
46-
for _, cdnConfig := range config.CDNBulkIngestors {
47-
ingestor := initializers.CDNBulkIngestorFactory(logger, config.Chain, cdnConfig)
48-
ingestors = append(ingestors, NamedBulkIngestor{
49-
Name: cdnConfig.SourceURL,
50-
Ingestor: ingestor,
51-
})
43+
logger.Info("Chaintracks service - creating bulk ingestors", slog.Any("configured_sources", config.BulkIngestors))
44+
45+
ingestors := make([]NamedBulkIngestor, 0, len(config.BulkIngestors))
46+
for _, ingestorConfig := range config.BulkIngestors {
47+
switch ingestorConfig.Type {
48+
case defs.ChaintracksCDN:
49+
ingestor := initializers.CDNBulkIngestorFactory(logger, config.Chain, *ingestorConfig.CDNConfig)
50+
ingestors = append(ingestors, NamedBulkIngestor{
51+
Name: ingestorConfig.String(),
52+
Ingestor: ingestor,
53+
})
54+
case defs.WhatsOnChainCDN:
55+
ingestor := initializers.WOCBulkIngestorFactory(logger, config.Chain, config.WocAPIKey)
56+
ingestors = append(ingestors, NamedBulkIngestor{
57+
Name: ingestorConfig.String(),
58+
Ingestor: ingestor,
59+
})
60+
default:
61+
logger.Warn("Chaintracks service - unsupported bulk ingestor type, skipping", slog.String("ingestor_type", string(ingestorConfig.Type)))
62+
}
5263
}
5364

5465
return ingestors

pkg/services/chaintracks/ingest/bulk_ingestor_woc.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,16 @@ func (b *BulkIngestorWOC) bulkFileDownloader() BulkFileDownloader {
123123
return BulkFileData{}, fmt.Errorf("failed to download bulk header file %s: %w", fileInfo.FileName, err)
124124
}
125125

126-
return BulkFileData{
126+
fileData := BulkFileData{
127127
Info: fileInfo,
128128
Data: content,
129129
AccessedAt: time.Now(),
130-
}, nil
130+
}
131+
132+
// NOTE: We compute and set the FileHash here since WoC does not provide it in metadata
133+
fileData.Info.FileHash = fileData.Hash()
134+
135+
return fileData, nil
131136
}
132137
}
133138

pkg/services/chaintracks/ingest/cdn_data.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (b *BulkFileData) Validate() error {
6666
return fmt.Errorf("bulk file data is empty")
6767
}
6868

69-
dataHash := crypto.Sha256(b.Data)
69+
dataHash := b.Hash()
7070
if !bytes.Equal(dataHash, b.Info.FileHash) {
7171
base64Expected := base64.StdEncoding.EncodeToString(b.Info.FileHash)
7272
base64Got := base64.StdEncoding.EncodeToString(dataHash)
@@ -84,6 +84,11 @@ func (b *BulkFileData) Validate() error {
8484
return nil
8585
}
8686

87+
// Hash computes and returns the SHA-256 hash of the bulk file data bytes as a byte slice.
88+
func (b *BulkFileData) Hash() []byte {
89+
return crypto.Sha256(b.Data)
90+
}
91+
8792
// Len returns the number of block headers contained in the bulk file data.
8893
func (b *BulkFileData) Len() int {
8994
return len(b.Data) / 80

pkg/services/chaintracks/models/height_range.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ func (h *HeightRanges) Validate() error {
2929
return fmt.Errorf("bulk storage must start with genesis header")
3030
}
3131

32-
if !h.Live.IsEmpty() && h.Bulk.MaxHeight+1 != h.Live.MinHeight {
32+
if !h.Live.IsEmpty() && h.Bulk.MaxHeight+1 < h.Live.MinHeight {
3333
gap := must.ConvertToIntFromUnsigned(h.Live.MinHeight) - must.ConvertToIntFromUnsigned(h.Bulk.MaxHeight) - 1
34-
return fmt.Errorf("there is a gap (%d) or overlap between bulk and live header storage, bulk max height: %d, live min height: %d", gap, h.Bulk.MaxHeight, h.Live.MinHeight)
34+
return fmt.Errorf("there is a gap (%d) between bulk and live header storage, bulk max height: %d, live min height: %d", gap, h.Bulk.MaxHeight, h.Live.MinHeight)
3535
}
3636
}
3737

0 commit comments

Comments
 (0)