Skip to content

Commit e0ea5cb

Browse files
committed
feat: add endpoint to query transfers table
1 parent c9d5a1f commit e0ea5cb

File tree

9 files changed

+410
-4
lines changed

9 files changed

+410
-4
lines changed

cmd/api.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ func RunApi(cmd *cobra.Command, args []string) {
9292
// token holder queries
9393
root.GET("/holders/:address", handlers.GetTokenHoldersByType)
9494

95+
// token transfers queries
96+
root.GET("/transfers", handlers.GetTokenTransfers)
97+
9598
// search
9699
root.GET("/search/:input", handlers.Search)
97100
}

internal/common/transfers.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package common
2+
3+
import (
4+
"math/big"
5+
"time"
6+
)
7+
8+
type TokenTransfer struct {
9+
TokenType string `json:"token_type" ch:"token_type"`
10+
ChainID *big.Int `json:"chain_id" ch:"chain_id"`
11+
TokenAddress string `json:"token_address" ch:"token_address"`
12+
FromAddress string `json:"from_address" ch:"from_address"`
13+
ToAddress string `json:"to_address" ch:"to_address"`
14+
BlockNumber *big.Int `json:"block_number" ch:"block_number"`
15+
BlockTimestamp time.Time `json:"block_timestamp" ch:"block_timestamp"`
16+
TransactionHash string `json:"transaction_hash" ch:"transaction_hash"`
17+
TokenID *big.Int `json:"token_id" ch:"token_id"`
18+
Amount *big.Int `json:"amount" ch:"amount"`
19+
LogIndex uint64 `json:"log_index" ch:"log_index"`
20+
Sign int8 `json:"sign" ch:"sign"`
21+
InsertTimestamp time.Time `json:"insert_timestamp" ch:"insert_timestamp"`
22+
}
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package handlers
2+
3+
import (
4+
"fmt"
5+
"math/big"
6+
"strings"
7+
"time"
8+
9+
"github.com/gin-gonic/gin"
10+
"github.com/rs/zerolog/log"
11+
"github.com/thirdweb-dev/indexer/api"
12+
"github.com/thirdweb-dev/indexer/internal/common"
13+
"github.com/thirdweb-dev/indexer/internal/storage"
14+
)
15+
16+
// TransferModel return type for Swagger documentation
17+
type TransferModel struct {
18+
TokenType string `json:"token_type" ch:"token_type"`
19+
TokenAddress string `json:"token_address" ch:"token_address"`
20+
FromAddress string `json:"from_address" ch:"from_address"`
21+
ToAddress string `json:"to_address" ch:"to_address"`
22+
TokenId string `json:"token_id" ch:"token_id"`
23+
Amount string `json:"amount" ch:"amount"`
24+
BlockNumber string `json:"block_number" ch:"block_number"`
25+
BlockTimestamp string `json:"block_timestamp" ch:"block_timestamp"`
26+
TransactionHash string `json:"transaction_hash" ch:"transaction_hash"`
27+
LogIndex uint64 `json:"log_index" ch:"log_index"`
28+
}
29+
30+
// @Summary Get token transfers
31+
// @Description Retrieve token transfers by various filters
32+
// @Tags transfers
33+
// @Accept json
34+
// @Produce json
35+
// @Security BasicAuth
36+
// @Param chainId path string true "Chain ID"
37+
// @Param token_type query []string false "Token types (erc721, erc1155, erc20)"
38+
// @Param token_address query string false "Token contract address"
39+
// @Param wallet query string false "Wallet address"
40+
// @Param start_block query string false "Start block number"
41+
// @Param end_block query string false "End block number"
42+
// @Param start_timestamp query string false "Start timestamp (RFC3339 format)"
43+
// @Param end_timestamp query string false "End timestamp (RFC3339 format)"
44+
// @Param token_id query []string false "Token IDs"
45+
// @Param transaction_hash query string false "Transaction hash"
46+
// @Param page query int false "Page number for pagination"
47+
// @Param limit query int false "Number of items per page" default(20)
48+
// @Success 200 {object} api.QueryResponse{data=[]TransferModel}
49+
// @Failure 400 {object} api.Error
50+
// @Failure 401 {object} api.Error
51+
// @Failure 500 {object} api.Error
52+
// @Router /{chainId}/transfers [get]
53+
func GetTokenTransfers(c *gin.Context) {
54+
chainId, err := api.GetChainId(c)
55+
if err != nil {
56+
api.BadRequestErrorHandler(c, err)
57+
return
58+
}
59+
60+
tokenTypes, err := getTokenTypesFromReq(c)
61+
if err != nil {
62+
api.BadRequestErrorHandler(c, err)
63+
return
64+
}
65+
66+
walletAddress := strings.ToLower(c.Query("wallet_address"))
67+
if walletAddress != "" && !strings.HasPrefix(walletAddress, "0x") {
68+
api.BadRequestErrorHandler(c, fmt.Errorf("invalid wallet_address '%s'", walletAddress))
69+
return
70+
}
71+
72+
tokenAddress := strings.ToLower(c.Query("token_address"))
73+
if tokenAddress != "" && !strings.HasPrefix(tokenAddress, "0x") {
74+
api.BadRequestErrorHandler(c, fmt.Errorf("invalid token_address '%s'", tokenAddress))
75+
return
76+
}
77+
78+
transactionHash := strings.ToLower(c.Query("transaction_hash"))
79+
if transactionHash != "" && !strings.HasPrefix(transactionHash, "0x") {
80+
api.BadRequestErrorHandler(c, fmt.Errorf("invalid transaction_hash '%s'", transactionHash))
81+
return
82+
}
83+
84+
tokenIds, err := getTokenIdsFromReq(c)
85+
if err != nil {
86+
api.BadRequestErrorHandler(c, fmt.Errorf("invalid token_id: %s", err))
87+
return
88+
}
89+
90+
// Parse block number parameters
91+
var startBlockNumber, endBlockNumber *big.Int
92+
startBlockStr := c.Query("start_block")
93+
if startBlockStr != "" {
94+
startBlockNumber = new(big.Int)
95+
_, ok := startBlockNumber.SetString(startBlockStr, 10)
96+
if !ok {
97+
api.BadRequestErrorHandler(c, fmt.Errorf("invalid start_block '%s'", startBlockStr))
98+
return
99+
}
100+
}
101+
102+
endBlockStr := c.Query("end_block")
103+
if endBlockStr != "" {
104+
endBlockNumber = new(big.Int)
105+
_, ok := endBlockNumber.SetString(endBlockStr, 10)
106+
if !ok {
107+
api.BadRequestErrorHandler(c, fmt.Errorf("invalid end_block '%s'", endBlockStr))
108+
return
109+
}
110+
}
111+
112+
// Define query filter
113+
qf := storage.TransfersQueryFilter{
114+
ChainId: chainId,
115+
TokenTypes: tokenTypes,
116+
WalletAddress: walletAddress,
117+
TokenAddress: tokenAddress,
118+
TokenIds: tokenIds,
119+
TransactionHash: transactionHash,
120+
StartBlockNumber: startBlockNumber,
121+
EndBlockNumber: endBlockNumber,
122+
Page: api.ParseIntQueryParam(c.Query("page"), 0),
123+
Limit: api.ParseIntQueryParam(c.Query("limit"), 20),
124+
SortBy: c.Query("sort_by"),
125+
SortOrder: c.Query("sort_order"),
126+
}
127+
128+
// Define columns for query
129+
columns := []string{
130+
"token_type",
131+
"token_address",
132+
"from_address",
133+
"to_address",
134+
"token_id",
135+
"amount",
136+
"block_number",
137+
"block_timestamp",
138+
"transaction_hash",
139+
"log_index",
140+
}
141+
142+
queryResult := api.QueryResponse{
143+
Meta: api.Meta{
144+
ChainId: chainId.Uint64(),
145+
Page: qf.Page,
146+
Limit: qf.Limit,
147+
},
148+
}
149+
150+
mainStorage, err = getMainStorage()
151+
if err != nil {
152+
log.Error().Err(err).Msg("Error getting main storage")
153+
api.InternalErrorHandler(c)
154+
return
155+
}
156+
157+
transfersResult, err := mainStorage.GetTokenTransfers(qf, columns...)
158+
if err != nil {
159+
log.Error().Err(err).Msg("Error querying token transfers")
160+
api.InternalErrorHandler(c)
161+
return
162+
}
163+
164+
queryResult.Data = serializeTransfers(transfersResult.Data)
165+
sendJSONResponse(c, queryResult)
166+
}
167+
168+
func serializeTransfers(transfers []common.TokenTransfer) []TransferModel {
169+
transferModels := make([]TransferModel, len(transfers))
170+
for i, transfer := range transfers {
171+
transferModels[i] = serializeTransfer(transfer)
172+
}
173+
return transferModels
174+
}
175+
176+
func serializeTransfer(transfer common.TokenTransfer) TransferModel {
177+
return TransferModel{
178+
TokenType: transfer.TokenType,
179+
TokenAddress: transfer.TokenAddress,
180+
FromAddress: transfer.FromAddress,
181+
ToAddress: transfer.ToAddress,
182+
TokenId: transfer.TokenID.String(),
183+
Amount: transfer.Amount.String(),
184+
BlockNumber: transfer.BlockNumber.String(),
185+
BlockTimestamp: transfer.BlockTimestamp.Format(time.RFC3339),
186+
TransactionHash: transfer.TransactionHash,
187+
LogIndex: transfer.LogIndex,
188+
}
189+
}

internal/storage/clickhouse.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1396,6 +1396,109 @@ func (c *ClickHouseConnector) getTableName(chainId *big.Int, defaultTable string
13961396
return defaultTable
13971397
}
13981398

1399+
/*
1400+
type TokenTransfer struct {
1401+
TokenType string `json:"token_type"`
1402+
ChainID *big.Int `json:"chain_id"`
1403+
TokenAddress string `json:"token_address"`
1404+
FromAddress string `json:"from_address"`
1405+
ToAddress string `json:"to_address"`
1406+
BlockNumber *big.Int `json:"block_number"`
1407+
BlockTimestamp time.Time `json:"block_timestamp"`
1408+
TransactionHash string `json:"transaction_hash"`
1409+
TokenID *big.Int `json:"token_id"`
1410+
Amount *big.Int `json:"amount"`
1411+
LogIndex uint64 `json:"log_index"`
1412+
Sign int8 `json:"sign"`
1413+
InsertTimestamp time.Time `json:"insert_timestamp"`
1414+
}
1415+
1416+
*/
1417+
1418+
func (c *ClickHouseConnector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error) {
1419+
columns := "token_type, chain_id, token_address, from_address, to_address, block_number, block_timestamp, transaction_hash, token_id, amount, log_index, sign, insert_timestamp"
1420+
if len(fields) > 0 {
1421+
columns = strings.Join(fields, ", ")
1422+
}
1423+
query := fmt.Sprintf("SELECT %s FROM %s.token_transfers WHERE chain_id = ?", columns, c.cfg.Database)
1424+
1425+
if len(qf.TokenTypes) > 0 {
1426+
tokenTypesStr := ""
1427+
tokenTypesLen := len(qf.TokenTypes)
1428+
for i := 0; i < tokenTypesLen-1; i++ {
1429+
tokenTypesStr += fmt.Sprintf("'%s',", qf.TokenTypes[i])
1430+
}
1431+
tokenTypesStr += fmt.Sprintf("'%s'", qf.TokenTypes[tokenTypesLen-1])
1432+
query += fmt.Sprintf(" AND token_type in (%s)", tokenTypesStr)
1433+
}
1434+
1435+
if qf.WalletAddress != "" {
1436+
query += fmt.Sprintf(" AND (from_address = '%s' OR to_address = '%s')", qf.WalletAddress, qf.WalletAddress)
1437+
}
1438+
if qf.TokenAddress != "" {
1439+
query += fmt.Sprintf(" AND token_address = '%s'", qf.TokenAddress)
1440+
}
1441+
if qf.TransactionHash != "" {
1442+
query += fmt.Sprintf(" AND transaction_hash = '%s'", qf.TransactionHash)
1443+
}
1444+
1445+
if len(qf.TokenIds) > 0 {
1446+
tokenIdsStr := ""
1447+
tokenIdsLen := len(qf.TokenIds)
1448+
for i := 0; i < tokenIdsLen-1; i++ {
1449+
tokenIdsStr += fmt.Sprintf("%s,", qf.TokenIds[i].String())
1450+
}
1451+
tokenIdsStr += qf.TokenIds[tokenIdsLen-1].String()
1452+
query += fmt.Sprintf(" AND token_id in (%s)", tokenIdsStr)
1453+
}
1454+
1455+
if qf.StartBlockNumber != nil {
1456+
query += fmt.Sprintf(" AND block_number >= %s", qf.StartBlockNumber.String())
1457+
}
1458+
if qf.EndBlockNumber != nil {
1459+
query += fmt.Sprintf(" AND block_number <= %s", qf.EndBlockNumber.String())
1460+
}
1461+
1462+
if len(qf.GroupBy) > 0 {
1463+
query += fmt.Sprintf(" GROUP BY %s", strings.Join(qf.GroupBy, ", "))
1464+
}
1465+
1466+
// Add ORDER BY clause
1467+
if qf.SortBy != "" {
1468+
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
1469+
}
1470+
1471+
// Add limit clause
1472+
if qf.Page > 0 && qf.Limit > 0 {
1473+
offset := (qf.Page - 1) * qf.Limit
1474+
query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset)
1475+
} else if qf.Limit > 0 {
1476+
query += fmt.Sprintf(" LIMIT %d", qf.Limit)
1477+
}
1478+
fmt.Println("QUERRRYRYYRY", query)
1479+
fmt.Println("qf", qf.ChainId)
1480+
rows, err := c.conn.Query(context.Background(), query, qf.ChainId)
1481+
if err != nil {
1482+
return QueryResult[common.TokenTransfer]{}, err
1483+
}
1484+
defer rows.Close()
1485+
1486+
queryResult := QueryResult[common.TokenTransfer]{
1487+
Data: []common.TokenTransfer{},
1488+
}
1489+
1490+
for rows.Next() {
1491+
var tt common.TokenTransfer
1492+
err := rows.ScanStruct(&tt)
1493+
if err != nil {
1494+
return QueryResult[common.TokenTransfer]{}, err
1495+
}
1496+
queryResult.Data = append(queryResult.Data, tt)
1497+
}
1498+
1499+
return queryResult, nil
1500+
}
1501+
13991502
func (c *ClickHouseConnector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error) {
14001503
columns := "chain_id, token_type, address, owner, token_id, balance"
14011504
if len(fields) > 0 {

internal/storage/connector.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,23 @@ type QueryFilter struct {
2525
ForceConsistentData bool
2626
}
2727

28+
type TransfersQueryFilter struct {
29+
ChainId *big.Int
30+
TokenTypes []string
31+
TokenAddress string
32+
WalletAddress string
33+
TokenIds []*big.Int
34+
TransactionHash string
35+
StartBlockNumber *big.Int
36+
EndBlockNumber *big.Int
37+
GroupBy []string
38+
SortBy string
39+
SortOrder string // "ASC" or "DESC"
40+
Page int
41+
Limit int
42+
Offset int
43+
}
44+
2845
type BalancesQueryFilter struct {
2946
ChainId *big.Int
3047
TokenTypes []string
@@ -83,6 +100,7 @@ type IMainStorage interface {
83100
DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error
84101

85102
GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)
103+
GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)
86104
}
87105

88106
func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) {

0 commit comments

Comments
 (0)