Skip to content

Commit 584a14d

Browse files
authored
Unique blockstores: Storage Market (#341)
* feat(storagemarket): use unique blockstores support each deal having it's own unique blockstore, seperate from other deals, and cleaning up all data on the provider side when the deal is finished * fix(diagrams): update diagrams * fix(lint): fix lint issues * fix(deps): update to tagged multistore * fix(deps): remove accidental old libp2p * feat(storagemarket): support blockstore support using existing blockstore if needed (for IPFS compatibility) * fix(dtutils): support nil for store support nil for store to avoid an error log
1 parent 1916619 commit 584a14d

File tree

3 files changed

+104
-46
lines changed

3 files changed

+104
-46
lines changed

pieceio/pieceio.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/ipld/go-car"
1212
"github.com/ipld/go-ipld-prime"
1313

14+
"github.com/filecoin-project/go-multistore"
1415
"github.com/filecoin-project/go-padreader"
1516
"github.com/filecoin-project/sector-storage/ffiwrapper"
1617
"github.com/filecoin-project/specs-actors/actors/abi"
@@ -36,25 +37,34 @@ type CarIO interface {
3637
}
3738

3839
type pieceIO struct {
39-
carIO CarIO
40-
bs blockstore.Blockstore
40+
carIO CarIO
41+
bs blockstore.Blockstore
42+
multiStore MultiStore
4143
}
4244

43-
func NewPieceIO(carIO CarIO, bs blockstore.Blockstore) PieceIO {
44-
return &pieceIO{carIO, bs}
45+
type MultiStore interface {
46+
Get(i multistore.StoreID) (*multistore.Store, error)
47+
}
48+
49+
func NewPieceIO(carIO CarIO, bs blockstore.Blockstore, multiStore MultiStore) PieceIO {
50+
return &pieceIO{carIO, bs, multiStore}
4551
}
4652

4753
type pieceIOWithStore struct {
4854
pieceIO
4955
store filestore.FileStore
5056
}
5157

52-
func NewPieceIOWithStore(carIO CarIO, store filestore.FileStore, bs blockstore.Blockstore) PieceIOWithStore {
53-
return &pieceIOWithStore{pieceIO{carIO, bs}, store}
58+
func NewPieceIOWithStore(carIO CarIO, store filestore.FileStore, bs blockstore.Blockstore, multiStore MultiStore) PieceIOWithStore {
59+
return &pieceIOWithStore{pieceIO{carIO, bs, multiStore}, store}
5460
}
5561

56-
func (pio *pieceIO) GeneratePieceCommitment(rt abi.RegisteredSealProof, payloadCid cid.Cid, selector ipld.Node) (cid.Cid, abi.UnpaddedPieceSize, error) {
57-
preparedCar, err := pio.carIO.PrepareCar(context.Background(), pio.bs, payloadCid, selector)
62+
func (pio *pieceIO) GeneratePieceCommitment(rt abi.RegisteredSealProof, payloadCid cid.Cid, selector ipld.Node, storeID *multistore.StoreID) (cid.Cid, abi.UnpaddedPieceSize, error) {
63+
bstore, err := pio.bstore(storeID)
64+
if err != nil {
65+
return cid.Undef, 0, err
66+
}
67+
preparedCar, err := pio.carIO.PrepareCar(context.Background(), bstore, payloadCid, selector)
5868
if err != nil {
5969
return cid.Undef, 0, err
6070
}
@@ -89,7 +99,11 @@ func (pio *pieceIO) GeneratePieceCommitment(rt abi.RegisteredSealProof, payloadC
8999
return commitment, paddedSize, nil
90100
}
91101

92-
func (pio *pieceIOWithStore) GeneratePieceCommitmentToFile(rt abi.RegisteredSealProof, payloadCid cid.Cid, selector ipld.Node, userOnNewCarBlocks ...car.OnNewCarBlockFunc) (cid.Cid, filestore.Path, abi.UnpaddedPieceSize, error) {
102+
func (pio *pieceIOWithStore) GeneratePieceCommitmentToFile(rt abi.RegisteredSealProof, payloadCid cid.Cid, selector ipld.Node, storeID *multistore.StoreID, userOnNewCarBlocks ...car.OnNewCarBlockFunc) (cid.Cid, filestore.Path, abi.UnpaddedPieceSize, error) {
103+
bstore, err := pio.bstore(storeID)
104+
if err != nil {
105+
return cid.Undef, "", 0, err
106+
}
93107
f, err := pio.store.CreateTemp()
94108
if err != nil {
95109
return cid.Undef, "", 0, err
@@ -98,7 +112,7 @@ func (pio *pieceIOWithStore) GeneratePieceCommitmentToFile(rt abi.RegisteredSeal
98112
f.Close()
99113
_ = pio.store.Delete(f.Path())
100114
}
101-
err = pio.carIO.WriteCar(context.Background(), pio.bs, payloadCid, selector, f, userOnNewCarBlocks...)
115+
err = pio.carIO.WriteCar(context.Background(), bstore, payloadCid, selector, f, userOnNewCarBlocks...)
102116
if err != nil {
103117
cleanup()
104118
return cid.Undef, "", 0, err
@@ -127,6 +141,21 @@ func GeneratePieceCommitment(rt abi.RegisteredSealProof, rd io.Reader, pieceSize
127141
return commitment, paddedSize, nil
128142
}
129143

130-
func (pio *pieceIO) ReadPiece(r io.Reader) (cid.Cid, error) {
131-
return pio.carIO.LoadCar(pio.bs, r)
144+
func (pio *pieceIO) ReadPiece(storeID *multistore.StoreID, r io.Reader) (cid.Cid, error) {
145+
bstore, err := pio.bstore(storeID)
146+
if err != nil {
147+
return cid.Undef, err
148+
}
149+
return pio.carIO.LoadCar(bstore, r)
150+
}
151+
152+
func (pio *pieceIO) bstore(storeID *multistore.StoreID) (blockstore.Blockstore, error) {
153+
if storeID == nil {
154+
return pio.bs, nil
155+
}
156+
store, err := pio.multiStore.Get(*storeID)
157+
if err != nil {
158+
return nil, err
159+
}
160+
return store.Bstore, nil
132161
}

pieceio/pieceio_test.go

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,17 @@ import (
88
"testing"
99

1010
"github.com/ipfs/go-cid"
11+
"github.com/ipfs/go-datastore"
12+
dss "github.com/ipfs/go-datastore/sync"
13+
blockstore "github.com/ipfs/go-ipfs-blockstore"
1114
dag "github.com/ipfs/go-merkledag"
12-
dstest "github.com/ipfs/go-merkledag/test"
1315
basicnode "github.com/ipld/go-ipld-prime/node/basic"
1416
"github.com/ipld/go-ipld-prime/traversal/selector"
1517
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
1618
"github.com/stretchr/testify/mock"
1719
"github.com/stretchr/testify/require"
1820

21+
"github.com/filecoin-project/go-multistore"
1922
"github.com/filecoin-project/sector-storage/ffiwrapper"
2023
"github.com/filecoin-project/specs-actors/actors/abi"
2124

@@ -33,13 +36,18 @@ func Test_ThereAndBackAgain(t *testing.T) {
3336
store, err := filestore.NewLocalFileStore(tempDir)
3437
require.NoError(t, err)
3538

36-
sourceBserv := dstest.Bserv()
37-
sourceBs := sourceBserv.Blockstore()
39+
ds := dss.MutexWrap(datastore.NewMapDatastore())
40+
multiStore, err := multistore.NewMultiDstore(ds)
41+
require.NoError(t, err)
42+
bs := blockstore.NewBlockstore(ds)
3843

39-
pio := pieceio.NewPieceIOWithStore(cio, store, sourceBs)
44+
pio := pieceio.NewPieceIOWithStore(cio, store, bs, multiStore)
4045
require.NoError(t, err)
4146

42-
dserv := dag.NewDAGService(sourceBserv)
47+
storeID := multiStore.Next()
48+
mstore, err := multiStore.Get(storeID)
49+
require.NoError(t, err)
50+
dserv := mstore.DAG
4351
a := dag.NewRawNode([]byte("aaaa"))
4452
b := dag.NewRawNode([]byte("bbbb"))
4553
c := dag.NewRawNode([]byte("cccc"))
@@ -69,7 +77,7 @@ func Test_ThereAndBackAgain(t *testing.T) {
6977
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
7078
}).Node()
7179

72-
pcid, tmpPath, _, err := pio.GeneratePieceCommitmentToFile(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node)
80+
pcid, tmpPath, _, err := pio.GeneratePieceCommitmentToFile(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node, &storeID)
7381
require.NoError(t, err)
7482
tmpFile, err := store.Open(tmpPath)
7583
require.NoError(t, err)
@@ -111,7 +119,7 @@ func Test_ThereAndBackAgain(t *testing.T) {
111119
reader = tmpFile
112120
}
113121

114-
id, err := pio.ReadPiece(reader)
122+
id, err := pio.ReadPiece(&storeID, reader)
115123
require.NoError(t, err)
116124
require.Equal(t, nd3.Cid(), id)
117125
}
@@ -123,11 +131,17 @@ func Test_StoreRestoreMemoryBuffer(t *testing.T) {
123131
store, err := filestore.NewLocalFileStore(tempDir)
124132
require.NoError(t, err)
125133

126-
sourceBserv := dstest.Bserv()
127-
sourceBs := sourceBserv.Blockstore()
128-
pio := pieceio.NewPieceIOWithStore(cio, store, sourceBs)
134+
ds := dss.MutexWrap(datastore.NewMapDatastore())
135+
multiStore, err := multistore.NewMultiDstore(ds)
136+
require.NoError(t, err)
137+
bs := blockstore.NewBlockstore(ds)
138+
139+
pio := pieceio.NewPieceIOWithStore(cio, store, bs, multiStore)
129140

130-
dserv := dag.NewDAGService(sourceBserv)
141+
storeID := multiStore.Next()
142+
mstore, err := multiStore.Get(storeID)
143+
require.NoError(t, err)
144+
dserv := mstore.DAG
131145
a := dag.NewRawNode([]byte("aaaa"))
132146
b := dag.NewRawNode([]byte("bbbb"))
133147
c := dag.NewRawNode([]byte("cccc"))
@@ -157,7 +171,7 @@ func Test_StoreRestoreMemoryBuffer(t *testing.T) {
157171
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
158172
}).Node()
159173

160-
commitment, tmpPath, paddedSize, err := pio.GeneratePieceCommitmentToFile(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node)
174+
commitment, tmpPath, paddedSize, err := pio.GeneratePieceCommitmentToFile(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node, &storeID)
161175
require.NoError(t, err)
162176
tmpFile, err := store.Open(tmpPath)
163177
require.NoError(t, err)
@@ -188,11 +202,18 @@ func Test_PieceCommitmentEquivalenceMemoryFile(t *testing.T) {
188202
store, err := filestore.NewLocalFileStore(tempDir)
189203
require.NoError(t, err)
190204

191-
sourceBserv := dstest.Bserv()
192-
sourceBs := sourceBserv.Blockstore()
193-
pio := pieceio.NewPieceIOWithStore(cio, store, sourceBs)
205+
ds := dss.MutexWrap(datastore.NewMapDatastore())
206+
multiStore, err := multistore.NewMultiDstore(ds)
207+
require.NoError(t, err)
208+
bs := blockstore.NewBlockstore(ds)
209+
210+
pio := pieceio.NewPieceIOWithStore(cio, store, bs, multiStore)
211+
212+
storeID := multiStore.Next()
213+
mstore, err := multiStore.Get(storeID)
214+
require.NoError(t, err)
215+
dserv := mstore.DAG
194216

195-
dserv := dag.NewDAGService(sourceBserv)
196217
a := dag.NewRawNode([]byte("aaaa"))
197218
b := dag.NewRawNode([]byte("bbbb"))
198219
c := dag.NewRawNode([]byte("cccc"))
@@ -222,13 +243,13 @@ func Test_PieceCommitmentEquivalenceMemoryFile(t *testing.T) {
222243
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
223244
}).Node()
224245

225-
fcommitment, tmpPath, fpaddedSize, ferr := pio.GeneratePieceCommitmentToFile(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node)
246+
fcommitment, tmpPath, fpaddedSize, ferr := pio.GeneratePieceCommitmentToFile(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node, &storeID)
226247
defer func() {
227248
deferErr := store.Delete(tmpPath)
228249
require.NoError(t, deferErr)
229250
}()
230251

231-
mcommitment, mpaddedSize, merr := pio.GeneratePieceCommitment(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node)
252+
mcommitment, mpaddedSize, merr := pio.GeneratePieceCommitment(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node, &storeID)
232253
require.Equal(t, fcommitment, mcommitment)
233254
require.Equal(t, fpaddedSize, mpaddedSize)
234255
require.Equal(t, ferr, merr)
@@ -237,9 +258,16 @@ func Test_PieceCommitmentEquivalenceMemoryFile(t *testing.T) {
237258
}
238259

239260
func Test_Failures(t *testing.T) {
240-
sourceBserv := dstest.Bserv()
241-
sourceBs := sourceBserv.Blockstore()
242-
dserv := dag.NewDAGService(sourceBserv)
261+
262+
ds := dss.MutexWrap(datastore.NewMapDatastore())
263+
multiStore, err := multistore.NewMultiDstore(ds)
264+
require.NoError(t, err)
265+
bs := blockstore.NewBlockstore(ds)
266+
267+
storeID := multiStore.Next()
268+
mstore, err := multiStore.Get(storeID)
269+
require.NoError(t, err)
270+
dserv := mstore.DAG
243271
a := dag.NewRawNode([]byte("aaaa"))
244272
b := dag.NewRawNode([]byte("bbbb"))
245273
c := dag.NewRawNode([]byte("cccc"))
@@ -272,8 +300,8 @@ func Test_Failures(t *testing.T) {
272300
t.Run("create temp file fails", func(t *testing.T) {
273301
fsmock := fsmocks.FileStore{}
274302
fsmock.On("CreateTemp").Return(nil, fmt.Errorf("Failed"))
275-
pio := pieceio.NewPieceIOWithStore(nil, &fsmock, sourceBs)
276-
_, _, _, err := pio.GeneratePieceCommitmentToFile(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node)
303+
pio := pieceio.NewPieceIOWithStore(nil, &fsmock, bs, multiStore)
304+
_, _, _, err := pio.GeneratePieceCommitmentToFile(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node, &storeID)
277305
require.Error(t, err)
278306
})
279307
t.Run("write CAR fails", func(t *testing.T) {
@@ -284,17 +312,17 @@ func Test_Failures(t *testing.T) {
284312
ciomock := pmocks.CarIO{}
285313
any := mock.Anything
286314
ciomock.On("WriteCar", any, any, any, any, any).Return(fmt.Errorf("failed to write car"))
287-
pio := pieceio.NewPieceIOWithStore(&ciomock, store, sourceBs)
288-
_, _, _, err = pio.GeneratePieceCommitmentToFile(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node)
315+
pio := pieceio.NewPieceIOWithStore(&ciomock, store, bs, multiStore)
316+
_, _, _, err = pio.GeneratePieceCommitmentToFile(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node, &storeID)
289317
require.Error(t, err)
290318
})
291319
t.Run("prepare CAR fails", func(t *testing.T) {
292320

293321
ciomock := pmocks.CarIO{}
294322
any := mock.Anything
295323
ciomock.On("PrepareCar", any, any, any, any).Return(nil, fmt.Errorf("failed to prepare car"))
296-
pio := pieceio.NewPieceIO(&ciomock, sourceBs)
297-
_, _, err := pio.GeneratePieceCommitment(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node)
324+
pio := pieceio.NewPieceIO(&ciomock, bs, multiStore)
325+
_, _, err := pio.GeneratePieceCommitment(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node, &storeID)
298326
require.Error(t, err)
299327
})
300328
t.Run("PreparedCard dump operation fails", func(t *testing.T) {
@@ -304,8 +332,8 @@ func Test_Failures(t *testing.T) {
304332
ciomock.On("PrepareCar", any, any, any, any).Return(&preparedCarMock, nil)
305333
preparedCarMock.On("Size").Return(uint64(1000))
306334
preparedCarMock.On("Dump", any).Return(fmt.Errorf("failed to write car"))
307-
pio := pieceio.NewPieceIO(&ciomock, sourceBs)
308-
_, _, err := pio.GeneratePieceCommitment(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node)
335+
pio := pieceio.NewPieceIO(&ciomock, bs, multiStore)
336+
_, _, err := pio.GeneratePieceCommitment(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node, &storeID)
309337
require.Error(t, err)
310338
})
311339
t.Run("seek fails", func(t *testing.T) {
@@ -331,8 +359,8 @@ func Test_Failures(t *testing.T) {
331359
mockfile.On("Path").Return(filestore.Path("mock")).Once()
332360
mockfile.On("Seek", mock.Anything, mock.Anything).Return(int64(0), fmt.Errorf("seek failed"))
333361

334-
pio := pieceio.NewPieceIOWithStore(cio, &fsmock, sourceBs)
335-
_, _, _, err := pio.GeneratePieceCommitmentToFile(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node)
362+
pio := pieceio.NewPieceIOWithStore(cio, &fsmock, bs, multiStore)
363+
_, _, _, err := pio.GeneratePieceCommitmentToFile(abi.RegisteredSealProof_StackedDrg2KiBV1, nd3.Cid(), node, &storeID)
336364
require.Error(t, err)
337365
})
338366
}

pieceio/types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/ipld/go-car"
99
"github.com/ipld/go-ipld-prime"
1010

11+
"github.com/filecoin-project/go-multistore"
1112
"github.com/filecoin-project/specs-actors/actors/abi"
1213

1314
"github.com/filecoin-project/go-fil-markets/filestore"
@@ -23,11 +24,11 @@ type ReadStore interface {
2324

2425
// PieceIO converts between payloads and pieces
2526
type PieceIO interface {
26-
GeneratePieceCommitment(rt abi.RegisteredSealProof, payloadCid cid.Cid, selector ipld.Node) (cid.Cid, abi.UnpaddedPieceSize, error)
27-
ReadPiece(r io.Reader) (cid.Cid, error)
27+
GeneratePieceCommitment(rt abi.RegisteredSealProof, payloadCid cid.Cid, selector ipld.Node, storeID *multistore.StoreID) (cid.Cid, abi.UnpaddedPieceSize, error)
28+
ReadPiece(storeID *multistore.StoreID, r io.Reader) (cid.Cid, error)
2829
}
2930

3031
type PieceIOWithStore interface {
3132
PieceIO
32-
GeneratePieceCommitmentToFile(rt abi.RegisteredSealProof, payloadCid cid.Cid, selector ipld.Node, userOnNewCarBlocks ...car.OnNewCarBlockFunc) (cid.Cid, filestore.Path, abi.UnpaddedPieceSize, error)
33+
GeneratePieceCommitmentToFile(rt abi.RegisteredSealProof, payloadCid cid.Cid, selector ipld.Node, storeID *multistore.StoreID, userOnNewCarBlocks ...car.OnNewCarBlockFunc) (cid.Cid, filestore.Path, abi.UnpaddedPieceSize, error)
3334
}

0 commit comments

Comments
 (0)