@@ -27,9 +27,7 @@ import (
2727 snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
2828 "github.com/containerd/containerd/contrib/snapshotservice"
2929 "github.com/containerd/containerd/log"
30- "github.com/containerd/containerd/snapshots"
3130 "github.com/firecracker-microvm/firecracker-go-sdk/vsock"
32- "github.com/sirupsen/logrus"
3331 "golang.org/x/sync/errgroup"
3432 "google.golang.org/grpc"
3533
@@ -55,52 +53,46 @@ func Run(config config.Config) error {
5553
5654 group , ctx := errgroup .WithContext (ctx )
5755
58- cache := cache .NewSnapshotterCache ()
59-
6056 var (
6157 monitor * metrics.Monitor
6258 serviceDiscovery * discovery.ServiceDiscovery
6359 )
60+
6461 if config .Snapshotter .Metrics .Enable {
65- sdHost := config .Snapshotter .Metrics .Host
66- sdPort := config .Snapshotter .Metrics .ServiceDiscoveryPort
67- serviceDiscovery = discovery .NewServiceDiscovery (sdHost , sdPort , cache )
6862 var err error
6963 monitor , err = initMetricsProxyMonitor (config .Snapshotter .Metrics .PortRange )
7064 if err != nil {
71- log .G (ctx ).WithError (err ).Fatal ("failed creating metrics proxy monitor" )
72- return err
65+ return fmt .Errorf ("failed creating metrics proxy monitor: %w" , err )
7366 }
74- group .Go (func () error {
75- return serviceDiscovery .Serve ()
76- })
7767 group .Go (func () error {
7868 return monitor .Start ()
7969 })
8070 }
8171
82- snapshotter , err := initSnapshotter ( ctx , config , cache , monitor )
72+ cache , err := initCache ( config , monitor )
8373 if err != nil {
84- log .G (ctx ).WithFields (
85- logrus.Fields {"resolver" : config .Snapshotter .Proxy .Address .Resolver .Type },
86- ).WithError (err ).Fatal ("failed creating socket resolver" )
87- return err
74+ return fmt .Errorf ("failed initializing cache: %w" , err )
75+ }
76+
77+ if config .Snapshotter .Metrics .Enable {
78+ sdHost := config .Snapshotter .Metrics .Host
79+ sdPort := config .Snapshotter .Metrics .ServiceDiscoveryPort
80+ serviceDiscovery = discovery .NewServiceDiscovery (sdHost , sdPort , cache )
81+ group .Go (func () error {
82+ return serviceDiscovery .Serve ()
83+ })
8884 }
8985
86+ snapshotter := demux .NewSnapshotter (cache )
87+
9088 grpcServer := grpc .NewServer ()
9189 service := snapshotservice .FromSnapshotter (snapshotter )
9290 snapshotsapi .RegisterSnapshotsServer (grpcServer , service )
9391
9492 listenerConfig := config .Snapshotter .Listener
9593 listener , err := net .Listen (listenerConfig .Network , listenerConfig .Address )
9694 if err != nil {
97- log .G (ctx ).WithFields (
98- logrus.Fields {
99- "network" : listenerConfig .Network ,
100- "address" : listenerConfig .Address ,
101- },
102- ).WithError (err ).Fatal ("failed creating listener" )
103- return err
95+ return fmt .Errorf ("failed creating service listener{network: %s, address: %s}: %w" , listenerConfig .Network , listenerConfig .Address , err )
10496 }
10597
10698 group .Go (func () error {
@@ -138,8 +130,7 @@ func Run(config config.Config) error {
138130 })
139131
140132 if err := group .Wait (); err != nil {
141- log .G (ctx ).WithError (err ).Error ("demux snapshotter error" )
142- return err
133+ return fmt .Errorf ("demux snapshotter error: %w" , err )
143134 }
144135
145136 log .G (ctx ).Info ("done" )
@@ -159,15 +150,24 @@ func initResolver(config config.Config) (proxyaddress.Resolver, error) {
159150const base10 = 10
160151const bits32 = 32
161152
162- func initSnapshotter ( ctx context. Context , config config.Config , cache cache. Cache , monitor * metrics.Monitor ) (snapshots. Snapshotter , error ) {
153+ func initCache ( config config.Config , monitor * metrics.Monitor ) (* cache. RemoteSnapshotterCache , error ) {
163154 resolver , err := initResolver (config )
164155 if err != nil {
165156 return nil , err
166157 }
167158
168- newRemoteSnapshotterFunc := func (ctx context.Context , namespace string ) (* proxy.RemoteSnapshotter , error ) {
159+ // TODO: https://github.com/firecracker-microvm/firecracker-containerd/issues/689
160+ snapshotterDialer := func (ctx context.Context , host string , port uint64 ) (net.Conn , error ) {
161+ return vsock .DialContext (ctx , host , uint32 (port ), vsock .WithLogger (log .G (ctx )),
162+ vsock .WithAckMsgTimeout (2 * time .Second ),
163+ vsock .WithRetryInterval (500 * time .Millisecond ),
164+ )
165+ }
166+
167+ dial := func (ctx context.Context , namespace string ) (net.Conn , error ) {
169168 r := resolver
170169 response , err := r .Get (namespace )
170+
171171 if err != nil {
172172 return nil , err
173173 }
@@ -177,12 +177,23 @@ func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cach
177177 return nil , err
178178 }
179179
180- // TODO: https://github.com/firecracker-microvm/firecracker-containerd/issues/689
181- snapshotterDialer := func (ctx context.Context , namespace string ) (net.Conn , error ) {
182- return vsock .DialContext (ctx , host , uint32 (port ), vsock .WithLogger (log .G (ctx )),
183- vsock .WithAckMsgTimeout (2 * time .Second ),
184- vsock .WithRetryInterval (200 * time .Millisecond ),
185- )
180+ return snapshotterDialer (ctx , host , port )
181+ }
182+
183+ fetch := func (ctx context.Context , namespace string ) (* proxy.RemoteSnapshotter , error ) {
184+ r := resolver
185+ response , err := r .Get (namespace )
186+ if err != nil {
187+ return nil , err
188+ }
189+ host := response .Address
190+ port , err := strconv .ParseUint (response .SnapshotterPort , base10 , bits32 )
191+ if err != nil {
192+ return nil , err
193+ }
194+
195+ dial := func (ctx context.Context , namespace string ) (net.Conn , error ) {
196+ return snapshotterDialer (ctx , host , port )
186197 }
187198
188199 var metricsProxy * metrics.Proxy
@@ -193,10 +204,20 @@ func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cach
193204 }
194205 }
195206
196- return proxy .NewRemoteSnapshotter (ctx , host , snapshotterDialer , metricsProxy )
207+ return proxy .NewRemoteSnapshotter (ctx , host , dial , metricsProxy )
208+ }
209+
210+ opts := make ([]cache.SnapshotterCacheOption , 0 )
211+
212+ if config .Snapshotter .Cache .EvictOnConnectionFailure {
213+ cachePollFrequency , err := time .ParseDuration (config .Snapshotter .Cache .PollConnectionFrequency )
214+ if err != nil {
215+ return nil , fmt .Errorf ("invalid cache evict poll connection frequency: %w" , err )
216+ }
217+ opts = append (opts , cache .EvictOnConnectionFailure (dial , cachePollFrequency , nil ))
197218 }
198219
199- return demux . NewSnapshotter ( cache , newRemoteSnapshotterFunc ), nil
220+ return cache . NewRemoteSnapshotterCache ( fetch , opts ... ), nil
200221}
201222
202223func initMetricsProxyMonitor (portRange string ) (* metrics.Monitor , error ) {
0 commit comments