Skip to content

Commit c75bbf1

Browse files
committed
added example for bulk upsert data with apache arrow data
1 parent 76492e8 commit c75bbf1

File tree

5 files changed

+7183
-2
lines changed

5 files changed

+7183
-2
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
_ "embed"
7+
"fmt"
8+
"github.com/apache/arrow/go/v18/arrow"
9+
"github.com/apache/arrow/go/v18/arrow/csv"
10+
"github.com/apache/arrow/go/v18/arrow/ipc"
11+
"github.com/ydb-platform/ydb-go-sdk/v3"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/table"
13+
"os"
14+
"path"
15+
)
16+
17+
var (
18+
//go:embed moscow.csv
19+
moscowWeather []byte
20+
21+
//go:embed schema.sql
22+
schema string
23+
)
24+
25+
func main() {
26+
ctx, cancel := context.WithCancel(context.Background())
27+
defer cancel()
28+
29+
db, err := ydb.Open(ctx, os.Getenv("YDB_CONNECTION_STRING"))
30+
if err != nil {
31+
panic(err)
32+
}
33+
34+
defer db.Close(ctx)
35+
36+
err = db.Query().Exec(ctx, schema)
37+
if err != nil {
38+
panic(err)
39+
}
40+
41+
schema := arrow.NewSchema(
42+
[]arrow.Field{
43+
{Name: "ID", Type: arrow.PrimitiveTypes.Uint64},
44+
{Name: "Date", Type: arrow.BinaryTypes.String},
45+
{Name: "MaxTemperatureF", Type: arrow.PrimitiveTypes.Int64},
46+
{Name: "MeanTemperatureF", Type: arrow.PrimitiveTypes.Int64},
47+
{Name: "MinTemperatureF", Type: arrow.PrimitiveTypes.Int64},
48+
{Name: "MaxDewPointF", Type: arrow.PrimitiveTypes.Int64},
49+
{Name: "MeanDewPointF", Type: arrow.PrimitiveTypes.Int64},
50+
{Name: "MinDewpointF", Type: arrow.PrimitiveTypes.Int64},
51+
{Name: "MaxHumidity", Type: arrow.PrimitiveTypes.Int64},
52+
{Name: "MeanHumidity", Type: arrow.PrimitiveTypes.Int64},
53+
{Name: "MinHumidity", Type: arrow.PrimitiveTypes.Int64},
54+
{Name: "MaxSeaLevelPressureIn", Type: arrow.PrimitiveTypes.Float64},
55+
{Name: "MeanSeaLevelPressureIn", Type: arrow.PrimitiveTypes.Float64},
56+
{Name: "MinSeaLevelPressureIn", Type: arrow.PrimitiveTypes.Float64},
57+
{Name: "MaxVisibilityMiles", Type: arrow.PrimitiveTypes.Int64},
58+
{Name: "MeanVisibilityMiles", Type: arrow.PrimitiveTypes.Int64},
59+
{Name: "MinVisibilityMiles", Type: arrow.PrimitiveTypes.Int64},
60+
{Name: "MaxWindSpeedMPH", Type: arrow.PrimitiveTypes.Int64},
61+
{Name: "MeanWindSpeedMPH", Type: arrow.PrimitiveTypes.Int64},
62+
{Name: "MaxGustSpeedMPH", Type: arrow.PrimitiveTypes.Int64},
63+
{Name: "PrecipitationIn", Type: arrow.PrimitiveTypes.Float64},
64+
{Name: "CloudCover", Type: arrow.PrimitiveTypes.Int64},
65+
{Name: "Events", Type: arrow.BinaryTypes.String},
66+
{Name: "WindDirDegrees", Type: arrow.BinaryTypes.String},
67+
{Name: "city", Type: arrow.BinaryTypes.String},
68+
{Name: "season", Type: arrow.BinaryTypes.String},
69+
},
70+
nil,
71+
)
72+
r := csv.NewReader(
73+
bytes.NewBuffer(moscowWeather),
74+
schema,
75+
csv.WithComma(','),
76+
csv.WithLazyQuotes(true),
77+
csv.WithHeader(false),
78+
csv.WithNullReader(true, ""),
79+
)
80+
defer r.Release()
81+
82+
s, err := schemaBytes(schema)
83+
if err != nil {
84+
panic(err)
85+
}
86+
87+
var data bytes.Buffer
88+
writer := ipc.NewWriter(&data, ipc.WithSchema(schema))
89+
90+
for r.Next() {
91+
err = writer.Write(r.Record())
92+
if err != nil {
93+
panic(err)
94+
}
95+
96+
d := bytes.TrimPrefix(data.Bytes(), s)
97+
98+
err = db.Table().BulkUpsert(ctx, path.Join(db.Name(), "moscow_weather"),
99+
table.BulkUpsertDataArrow(d, table.WithArrowSchema(s)),
100+
)
101+
if err != nil {
102+
panic(err)
103+
}
104+
105+
data.Reset()
106+
}
107+
108+
if err = r.Err(); err != nil {
109+
panic(err)
110+
}
111+
}
112+
113+
// https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
114+
var arrowIPCEndOfStreamMark = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0}
115+
116+
func schemaBytes(s *arrow.Schema) (schema []byte, err error) {
117+
buf := &bytes.Buffer{}
118+
writer := ipc.NewWriter(buf, ipc.WithSchema(s))
119+
120+
err = writer.Close()
121+
if err != nil {
122+
return nil, fmt.Errorf("failed to save arrow schema: %w", err)
123+
}
124+
125+
schema = bytes.Clone(buf.Bytes())
126+
schema = bytes.TrimSuffix(schema, arrowIPCEndOfStreamMark)
127+
128+
return schema, nil
129+
}

0 commit comments

Comments
 (0)