@@ -27,9 +27,7 @@ import (
2727 snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
2828 "github.com/containerd/containerd/contrib/snapshotservice"
2929 "github.com/containerd/containerd/log"
30- "github.com/containerd/containerd/snapshots"
3130 "github.com/firecracker-microvm/firecracker-go-sdk/vsock"
32- "github.com/sirupsen/logrus"
3331 "golang.org/x/sync/errgroup"
3432 "google.golang.org/grpc"
3533
@@ -55,52 +53,46 @@ func Run(config config.Config) error {
5553
5654 group , ctx := errgroup .WithContext (ctx )
5755
58- cache := cache .NewSnapshotterCache ()
59-
6056 var (
6157 monitor * metrics.Monitor
6258 serviceDiscovery * discovery.ServiceDiscovery
6359 )
60+
6461 if config .Snapshotter .Metrics .Enable {
65- sdHost := config .Snapshotter .Metrics .Host
66- sdPort := config .Snapshotter .Metrics .ServiceDiscoveryPort
67- serviceDiscovery = discovery .NewServiceDiscovery (sdHost , sdPort , cache )
6862 var err error
6963 monitor , err = initMetricsProxyMonitor (config .Snapshotter .Metrics .PortRange )
7064 if err != nil {
71- log .G (ctx ).WithError (err ).Fatal ("failed creating metrics proxy monitor" )
72- return err
65+ return fmt .Errorf ("failed creating metrics proxy monitor: %w" , err )
7366 }
74- group .Go (func () error {
75- return serviceDiscovery .Serve ()
76- })
7767 group .Go (func () error {
7868 return monitor .Start ()
7969 })
8070 }
8171
82- snapshotter , err := initSnapshotter ( ctx , config , cache , monitor )
72+ cache , err := initCache ( config , monitor )
8373 if err != nil {
84- log .G (ctx ).WithFields (
85- logrus.Fields {"resolver" : config .Snapshotter .Proxy .Address .Resolver .Type },
86- ).WithError (err ).Fatal ("failed creating socket resolver" )
87- return err
74+ return fmt .Errorf ("failed initializing cache: %w" , err )
75+ }
76+
77+ if config .Snapshotter .Metrics .Enable {
78+ sdHost := config .Snapshotter .Metrics .Host
79+ sdPort := config .Snapshotter .Metrics .ServiceDiscoveryPort
80+ serviceDiscovery = discovery .NewServiceDiscovery (sdHost , sdPort , cache )
81+ group .Go (func () error {
82+ return serviceDiscovery .Serve ()
83+ })
8884 }
8985
86+ snapshotter := demux .NewSnapshotter (cache )
87+
9088 grpcServer := grpc .NewServer ()
9189 service := snapshotservice .FromSnapshotter (snapshotter )
9290 snapshotsapi .RegisterSnapshotsServer (grpcServer , service )
9391
9492 listenerConfig := config .Snapshotter .Listener
9593 listener , err := net .Listen (listenerConfig .Network , listenerConfig .Address )
9694 if err != nil {
97- log .G (ctx ).WithFields (
98- logrus.Fields {
99- "network" : listenerConfig .Network ,
100- "address" : listenerConfig .Address ,
101- },
102- ).WithError (err ).Fatal ("failed creating listener" )
103- return err
95+ return fmt .Errorf ("failed creating service listener{network: %s, address: %s}: %w" , listenerConfig .Network , listenerConfig .Address , err )
10496 }
10597
10698 group .Go (func () error {
@@ -138,8 +130,7 @@ func Run(config config.Config) error {
138130 })
139131
140132 if err := group .Wait (); err != nil {
141- log .G (ctx ).WithError (err ).Error ("demux snapshotter error" )
142- return err
133+ return fmt .Errorf ("demux snapshotter error: %w" , err )
143134 }
144135
145136 log .G (ctx ).Info ("done" )
@@ -159,13 +150,46 @@ func initResolver(config config.Config) (proxyaddress.Resolver, error) {
159150const base10 = 10
160151const bits32 = 32
161152
162- func initSnapshotter ( ctx context. Context , config config.Config , cache cache. Cache , monitor * metrics.Monitor ) (snapshots. Snapshotter , error ) {
153+ func initCache ( config config.Config , monitor * metrics.Monitor ) (* cache. RemoteSnapshotterCache , error ) {
163154 resolver , err := initResolver (config )
164155 if err != nil {
165156 return nil , err
166157 }
167158
168- newRemoteSnapshotterFunc := func (ctx context.Context , namespace string ) (* proxy.RemoteSnapshotter , error ) {
159+ dialTimeout , err := time .ParseDuration (config .Snapshotter .Dialer .Timeout )
160+ if err != nil {
161+ return nil , fmt .Errorf ("Error parsing dialer retry interval from config: %w" , err )
162+ }
163+ retryInterval , err := time .ParseDuration (config .Snapshotter .Dialer .RetryInterval )
164+ if err != nil {
165+ return nil , fmt .Errorf ("Error parsing dialer retry interval from config: %w" , err )
166+ }
167+
168+ vsockDial := func (ctx context.Context , host string , port uint64 ) (net.Conn , error ) {
169+ return vsock .DialContext (ctx , host , uint32 (port ), vsock .WithLogger (log .G (ctx )),
170+ vsock .WithAckMsgTimeout (2 * time .Second ),
171+ vsock .WithRetryInterval (retryInterval ),
172+ )
173+ }
174+
175+ dial := func (ctx context.Context , namespace string ) (net.Conn , error ) {
176+ r := resolver
177+ response , err := r .Get (namespace )
178+
179+ if err != nil {
180+ return nil , err
181+ }
182+ host := response .Address
183+ port , err := strconv .ParseUint (response .SnapshotterPort , base10 , bits32 )
184+ if err != nil {
185+ return nil , err
186+ }
187+
188+ return vsockDial (ctx , host , port )
189+ }
190+ dialer := proxy.Dialer {Dial : dial , Timeout : dialTimeout }
191+
192+ fetch := func (ctx context.Context , namespace string ) (* proxy.RemoteSnapshotter , error ) {
169193 r := resolver
170194 response , err := r .Get (namespace )
171195 if err != nil {
@@ -177,12 +201,8 @@ func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cach
177201 return nil , err
178202 }
179203
180- // TODO: https://github.com/firecracker-microvm/firecracker-containerd/issues/689
181- snapshotterDialer := func (ctx context.Context , namespace string ) (net.Conn , error ) {
182- return vsock .DialContext (ctx , host , uint32 (port ), vsock .WithLogger (log .G (ctx )),
183- vsock .WithAckMsgTimeout (2 * time .Second ),
184- vsock .WithRetryInterval (200 * time .Millisecond ),
185- )
204+ dial := func (ctx context.Context , namespace string ) (net.Conn , error ) {
205+ return vsockDial (ctx , host , port )
186206 }
187207
188208 var metricsProxy * metrics.Proxy
@@ -193,10 +213,20 @@ func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cach
193213 }
194214 }
195215
196- return proxy .NewRemoteSnapshotter (ctx , host , snapshotterDialer , metricsProxy )
216+ return proxy .NewRemoteSnapshotter (ctx , host , dial , metricsProxy )
217+ }
218+
219+ opts := make ([]cache.SnapshotterCacheOption , 0 )
220+
221+ if config .Snapshotter .Cache .EvictOnConnectionFailure {
222+ cachePollFrequency , err := time .ParseDuration (config .Snapshotter .Cache .PollConnectionFrequency )
223+ if err != nil {
224+ return nil , fmt .Errorf ("invalid cache evict poll connection frequency: %w" , err )
225+ }
226+ opts = append (opts , cache .EvictOnConnectionFailure (dialer , cachePollFrequency ))
197227 }
198228
199- return demux . NewSnapshotter ( cache , newRemoteSnapshotterFunc ), nil
229+ return cache . NewRemoteSnapshotterCache ( fetch , opts ... ), nil
200230}
201231
202232func initMetricsProxyMonitor (portRange string ) (* metrics.Monitor , error ) {
0 commit comments