Skip to content

Commit 8c4b56e

Browse files
committed
Move CommPWriter to lib/
0 parents  commit 8c4b56e

File tree

2 files changed

+201
-0
lines changed

2 files changed

+201
-0
lines changed

writer.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package commp
2+
3+
import (
4+
"bytes"
5+
"math/bits"
6+
7+
"github.com/ipfs/go-cid"
8+
"golang.org/x/xerrors"
9+
10+
ffi "github.com/filecoin-project/filecoin-ffi"
11+
"github.com/filecoin-project/go-padreader"
12+
"github.com/filecoin-project/go-state-types/abi"
13+
14+
"github.com/filecoin-project/lotus/api"
15+
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
16+
"github.com/filecoin-project/lotus/extern/sector-storage/zerocomm"
17+
)
18+
19+
const commPBufPad = abi.PaddedPieceSize(8 << 20)
20+
const CommPBuf = abi.UnpaddedPieceSize(commPBufPad - (commPBufPad / 128)) // can't use .Unpadded() for const
21+
22+
type Writer struct {
23+
len int64
24+
buf [CommPBuf]byte
25+
leaves []cid.Cid
26+
}
27+
28+
func (w *Writer) Write(p []byte) (int, error) {
29+
n := len(p)
30+
for len(p) > 0 {
31+
buffered := int(w.len % int64(len(w.buf)))
32+
toBuffer := len(w.buf) - buffered
33+
if toBuffer > len(p) {
34+
toBuffer = len(p)
35+
}
36+
37+
copied := copy(w.buf[buffered:], p[:toBuffer])
38+
p = p[copied:]
39+
w.len += int64(copied)
40+
41+
if copied > 0 && w.len%int64(len(w.buf)) == 0 {
42+
leaf, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, bytes.NewReader(w.buf[:]), CommPBuf)
43+
if err != nil {
44+
return 0, err
45+
}
46+
w.leaves = append(w.leaves, leaf)
47+
}
48+
}
49+
return n, nil
50+
}
51+
52+
func (w *Writer) Sum() (api.DataCIDSize, error) {
53+
// process last non-zero leaf if exists
54+
lastLen := w.len % int64(len(w.buf))
55+
rawLen := w.len
56+
57+
// process remaining bit of data
58+
if lastLen != 0 {
59+
if len(w.leaves) != 0 {
60+
copy(w.buf[lastLen:], make([]byte, int(int64(CommPBuf)-lastLen)))
61+
lastLen = int64(CommPBuf)
62+
}
63+
64+
r, sz := padreader.New(bytes.NewReader(w.buf[:lastLen]), uint64(lastLen))
65+
p, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, r, sz)
66+
if err != nil {
67+
return api.DataCIDSize{}, err
68+
}
69+
70+
if sz < CommPBuf { // special case for pieces smaller than 16MiB
71+
return api.DataCIDSize{
72+
PayloadSize: w.len,
73+
PieceSize: sz.Padded(),
74+
PieceCID: p,
75+
}, nil
76+
}
77+
78+
w.leaves = append(w.leaves, p)
79+
}
80+
81+
// pad with zero pieces to power-of-two size
82+
fillerLeaves := (1 << (bits.Len(uint(len(w.leaves) - 1)))) - len(w.leaves)
83+
for i := 0; i < fillerLeaves; i++ {
84+
w.leaves = append(w.leaves, zerocomm.ZeroPieceCommitment(CommPBuf))
85+
}
86+
87+
if len(w.leaves) == 1 {
88+
return api.DataCIDSize{
89+
PayloadSize: rawLen,
90+
PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad,
91+
PieceCID: w.leaves[0],
92+
}, nil
93+
}
94+
95+
pieces := make([]abi.PieceInfo, len(w.leaves))
96+
for i, leaf := range w.leaves {
97+
pieces[i] = abi.PieceInfo{
98+
Size: commPBufPad,
99+
PieceCID: leaf,
100+
}
101+
}
102+
103+
p, err := ffi.GenerateUnsealedCID(abi.RegisteredSealProof_StackedDrg32GiBV1, pieces)
104+
if err != nil {
105+
return api.DataCIDSize{}, xerrors.Errorf("generating unsealed CID: %w", err)
106+
}
107+
108+
return api.DataCIDSize{
109+
PayloadSize: rawLen,
110+
PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad,
111+
PieceCID: p,
112+
}, nil
113+
}

writer_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package commp
2+
3+
import (
4+
"bytes"
5+
"crypto/rand"
6+
"fmt"
7+
"io"
8+
"io/ioutil"
9+
"testing"
10+
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/filecoin-project/go-padreader"
14+
"github.com/filecoin-project/go-state-types/abi"
15+
16+
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
17+
"github.com/filecoin-project/lotus/extern/sector-storage/zerocomm"
18+
)
19+
20+
func TestWriterZero(t *testing.T) {
21+
for i, s := range []struct {
22+
writes []int
23+
expect abi.PaddedPieceSize
24+
}{
25+
{writes: []int{200}, expect: 256},
26+
{writes: []int{200, 200}, expect: 512},
27+
28+
{writes: []int{int(CommPBuf)}, expect: commPBufPad},
29+
{writes: []int{int(CommPBuf) * 2}, expect: 2 * commPBufPad},
30+
{writes: []int{int(CommPBuf), int(CommPBuf), int(CommPBuf)}, expect: 4 * commPBufPad},
31+
{writes: []int{int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf)}, expect: 16 * commPBufPad},
32+
33+
{writes: []int{200, int(CommPBuf)}, expect: 2 * commPBufPad},
34+
} {
35+
s := s
36+
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
37+
w := &Writer{}
38+
var rawSum int64
39+
for _, write := range s.writes {
40+
rawSum += int64(write)
41+
_, err := w.Write(make([]byte, write))
42+
require.NoError(t, err)
43+
}
44+
45+
p, err := w.Sum()
46+
require.NoError(t, err)
47+
require.Equal(t, rawSum, p.PayloadSize)
48+
require.Equal(t, s.expect, p.PieceSize)
49+
require.Equal(t, zerocomm.ZeroPieceCommitment(s.expect.Unpadded()).String(), p.PieceCID.String())
50+
})
51+
}
52+
}
53+
54+
func TestWriterData(t *testing.T) {
55+
dataLen := float64(CommPBuf) * 6.78
56+
data, _ := ioutil.ReadAll(io.LimitReader(rand.Reader, int64(dataLen)))
57+
58+
pr, sz := padreader.New(bytes.NewReader(data), uint64(dataLen))
59+
exp, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, pr, sz)
60+
require.NoError(t, err)
61+
62+
w := &Writer{}
63+
_, err = io.Copy(w, bytes.NewReader(data))
64+
require.NoError(t, err)
65+
66+
res, err := w.Sum()
67+
require.NoError(t, err)
68+
69+
require.Equal(t, exp.String(), res.PieceCID.String())
70+
}
71+
72+
func BenchmarkWriterZero(b *testing.B) {
73+
buf := make([]byte, int(CommPBuf)*b.N)
74+
b.SetBytes(int64(CommPBuf))
75+
b.ResetTimer()
76+
77+
w := &Writer{}
78+
79+
_, err := w.Write(buf)
80+
require.NoError(b, err)
81+
o, err := w.Sum()
82+
83+
b.StopTimer()
84+
85+
require.NoError(b, err)
86+
require.Equal(b, zerocomm.ZeroPieceCommitment(o.PieceSize.Unpadded()).String(), o.PieceCID.String())
87+
require.Equal(b, int64(CommPBuf)*int64(b.N), o.PayloadSize)
88+
}

0 commit comments

Comments
 (0)