@@ -17,8 +17,6 @@ import (
1717 "github.com/filecoin-project/go-multistore"
1818 "github.com/filecoin-project/go-padreader"
1919 "github.com/filecoin-project/go-state-types/abi"
20-
21- "github.com/filecoin-project/go-fil-markets/filestore"
2220)
2321
2422type PreparedCar interface {
@@ -32,7 +30,7 @@ type CarIO interface {
3230
3331 // PrepareCar prepares a car so that its total size can be calculated without writing it to a file.
3432 // It can then be written with PreparedCar.Dump
35- PrepareCar (ctx context.Context , bs ReadStore , payloadCid cid.Cid , node ipld.Node ) (PreparedCar , error )
33+ PrepareCar (ctx context.Context , bs ReadStore , payloadCid cid.Cid , node ipld.Node , userOnNewCarBlocks ... car. OnNewCarBlockFunc ) (PreparedCar , error )
3634
3735 // LoadCar loads blocks into the a store from a given CAR file
3836 LoadCar (bs WriteStore , r io.Reader ) (cid.Cid , error )
@@ -52,40 +50,37 @@ func NewPieceIO(carIO CarIO, bs blockstore.Blockstore, multiStore MultiStore) Pi
5250 return & pieceIO {carIO , bs , multiStore }
5351}
5452
55- type pieceIOWithStore struct {
56- pieceIO
57- store filestore.FileStore
58- }
59-
60- func NewPieceIOWithStore (carIO CarIO , store filestore.FileStore , bs blockstore.Blockstore , multiStore MultiStore ) PieceIOWithStore {
61- return & pieceIOWithStore {pieceIO {carIO , bs , multiStore }, store }
62- }
63-
64- func (pio * pieceIO ) GeneratePieceCommitment (rt abi.RegisteredSealProof , payloadCid cid.Cid , selector ipld.Node , storeID * multistore.StoreID ) (cid.Cid , abi.UnpaddedPieceSize , error ) {
53+ func (pio * pieceIO ) GeneratePieceReader (payloadCid cid.Cid , selector ipld.Node , storeID * multistore.StoreID , userOnNewCarBlocks ... car.OnNewCarBlockFunc ) (io.ReadCloser , uint64 , error , <- chan error ) {
6554 bstore , err := pio .bstore (storeID )
6655 if err != nil {
67- return cid . Undef , 0 , err
56+ return nil , 0 , err , nil
6857 }
69- preparedCar , err := pio .carIO .PrepareCar (context .Background (), bstore , payloadCid , selector )
58+ preparedCar , err := pio .carIO .PrepareCar (context .Background (), bstore , payloadCid , selector , userOnNewCarBlocks ... )
7059 if err != nil {
71- return cid . Undef , 0 , err
60+ return nil , 0 , err , nil
7261 }
7362 pieceSize := uint64 (preparedCar .Size ())
7463 r , w , err := os .Pipe ()
7564 if err != nil {
76- return cid . Undef , 0 , err
65+ return nil , 0 , err , nil
7766 }
78- var stop sync.WaitGroup
79- stop .Add (1 )
80- var werr error
67+ writeErr := make (chan error , 1 )
8168 go func () {
82- defer stop .Done ()
83- werr = preparedCar .Dump (w )
69+ werr := preparedCar .Dump (w )
8470 err := w .Close ()
8571 if werr == nil && err != nil {
8672 werr = err
8773 }
74+ writeErr <- werr
8875 }()
76+ return r , pieceSize , nil , writeErr
77+ }
78+
79+ func (pio * pieceIO ) GeneratePieceCommitment (rt abi.RegisteredSealProof , payloadCid cid.Cid , selector ipld.Node , storeID * multistore.StoreID , userOnNewCarBlocks ... car.OnNewCarBlockFunc ) (cid.Cid , abi.UnpaddedPieceSize , error ) {
80+ r , pieceSize , err , writeErrChan := pio .GeneratePieceReader (payloadCid , selector , storeID , userOnNewCarBlocks ... )
81+ if err != nil {
82+ return cid .Undef , 0 , err
83+ }
8984 commitment , paddedSize , err := GeneratePieceCommitment (rt , r , pieceSize )
9085 closeErr := r .Close ()
9186 if err != nil {
@@ -94,46 +89,13 @@ func (pio *pieceIO) GeneratePieceCommitment(rt abi.RegisteredSealProof, payloadC
9489 if closeErr != nil {
9590 return cid .Undef , 0 , closeErr
9691 }
97- stop . Wait ()
92+ werr := <- writeErrChan
9893 if werr != nil {
9994 return cid .Undef , 0 , werr
10095 }
10196 return commitment , paddedSize , nil
10297}
10398
104- func (pio * pieceIOWithStore ) GeneratePieceCommitmentToFile (rt abi.RegisteredSealProof , payloadCid cid.Cid , selector ipld.Node , storeID * multistore.StoreID , userOnNewCarBlocks ... car.OnNewCarBlockFunc ) (cid.Cid , filestore.Path , abi.UnpaddedPieceSize , error ) {
105- bstore , err := pio .bstore (storeID )
106- if err != nil {
107- return cid .Undef , "" , 0 , err
108- }
109- f , err := pio .store .CreateTemp ()
110- if err != nil {
111- return cid .Undef , "" , 0 , err
112- }
113- cleanup := func () {
114- f .Close ()
115- _ = pio .store .Delete (f .Path ())
116- }
117- err = pio .carIO .WriteCar (context .Background (), bstore , payloadCid , selector , f , userOnNewCarBlocks ... )
118- if err != nil {
119- cleanup ()
120- return cid .Undef , "" , 0 , err
121- }
122- pieceSize := uint64 (f .Size ())
123- _ , err = f .Seek (0 , io .SeekStart )
124- if err != nil {
125- cleanup ()
126- return cid .Undef , "" , 0 , err
127- }
128- commitment , paddedSize , err := GeneratePieceCommitment (rt , f , pieceSize )
129- if err != nil {
130- cleanup ()
131- return cid .Undef , "" , 0 , err
132- }
133- _ = f .Close ()
134- return commitment , f .Path (), paddedSize , nil
135- }
136-
13799func GeneratePieceCommitment (rt abi.RegisteredSealProof , rd io.Reader , pieceSize uint64 ) (cid.Cid , abi.UnpaddedPieceSize , error ) {
138100 paddedReader , paddedSize := padreader .New (rd , pieceSize )
139101 commitment , err := GeneratePieceCIDFromFile (rt , paddedReader , paddedSize )
0 commit comments