diff --git a/internal/update/apt/service.go b/internal/update/apt/service.go index ef67caa6..3a6f7288 100644 --- a/internal/update/apt/service.go +++ b/internal/update/apt/service.go @@ -73,44 +73,50 @@ func (s *Service) ListUpgradablePackages(ctx context.Context, matcher func(updat // UpgradePackages upgrades the specified packages using the `apt-get upgrade` command. // It publishes events to subscribers during the upgrade process. // It returns an error if the upgrade is already in progress or if the upgrade command fails. -func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) { +func (s *Service) UpgradePackages(ctx context.Context, names []string) (iter.Seq[update.Event], error) { if !s.lock.TryLock() { return nil, update.ErrOperationAlreadyInProgress } - eventsCh := make(chan update.Event, 100) - go func() { + return func(yield func(update.Event) bool) { defer s.lock.Unlock() - defer close(eventsCh) - eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting") - stream := runUpgradeCommand(ctx, names) - for line, err := range stream { + if !yield(update.NewDataEvent(update.StartEvent, "Upgrade is starting")) { + return + } + for line, err := range runUpgradeCommand(ctx, names) { if err != nil { - eventsCh <- update.NewErrorEvent(fmt.Errorf("error running upgrade command: %w", err)) + _ = yield(update.NewErrorEvent(fmt.Errorf("error running upgrade command: %w", err))) + return + } + if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) { return } - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line) } - eventsCh <- update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting") + if !yield(update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting")) { + return + } for line, err := range runAptCleanCommand(ctx) { if err != nil { - eventsCh <- update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err)) + _ = yield(update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err))) + return + } + if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) { return } - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line) } - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ....") - streamCleanup := cleanupDockerContainers(ctx) - for line, err := range streamCleanup { + if !yield(update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ....")) { + return + } + for line, err := range cleanupDockerContainers(ctx) { if err != nil { // TODO: maybe we should retun an error or a better feedback to the user? // currently, we just log the error and continue considenring not blocking slog.Warn("Error stopping and destroying docker containers", "error", err) - } else { - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line) + } else if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) { + return } } @@ -118,25 +124,27 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u // Tracking issue: https://github.com/arduino/arduino-app-cli/issues/600 // Currently, we need to launch `arduino-app-cli system init` to pull the latest docker images because // the version of the docker images are hardcoded in the (new downloaded) version of the arduino-app-cli. - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images ...") - streamDocker := pullDockerImages(ctx) - for line, err := range streamDocker { + if !yield(update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images ...")) { + return + } + for line, err := range pullDockerImages(ctx) { if err != nil { - eventsCh <- update.NewErrorEvent(fmt.Errorf("error pulling docker images: %w", err)) + _ = yield(update.NewErrorEvent(fmt.Errorf("error pulling docker images: %w", err))) + return + } + if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) { return } - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line) } - eventsCh <- update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ...") - - err := restartServices(ctx) - if err != nil { - eventsCh <- update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err)) + if !yield(update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ...")) { return } - }() - return eventsCh, nil + if err := restartServices(ctx); err != nil { + _ = yield(update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err))) + return + } + }, nil } // runDpkgConfigureCommand is need in case an upgrade was interrupted in the middle diff --git a/internal/update/arduino/arduino.go b/internal/update/arduino/arduino.go index 0c4e5d29..c3f2d699 100644 --- a/internal/update/arduino/arduino.go +++ b/internal/update/arduino/arduino.go @@ -19,6 +19,7 @@ import ( "context" "errors" "fmt" + "iter" "log/slog" "sync" @@ -125,40 +126,42 @@ func (a *ArduinoPlatformUpdater) ListUpgradablePackages(ctx context.Context, _ f } // UpgradePackages implements ServiceUpdater. -func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) { +func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string) (iter.Seq[update.Event], error) { if !a.lock.TryLock() { return nil, update.ErrOperationAlreadyInProgress } - eventsCh := make(chan update.Event, 100) - downloadProgressCB := func(curr *rpc.DownloadProgress) { - data := helpers.ArduinoCLIDownloadProgressToString(curr) - slog.Debug("Download progress", slog.String("download_progress", data)) - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data) - } - taskProgressCB := func(msg *rpc.TaskProgress) { - data := helpers.ArduinoCLITaskProgressToString(msg) - slog.Debug("Task progress", slog.String("task_progress", data)) - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data) - } - - go func() { + return func(yield func(update.Event) bool) { defer a.lock.Unlock() - defer close(eventsCh) - eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting") + downloadProgressCB := func(curr *rpc.DownloadProgress) { + data := helpers.ArduinoCLIDownloadProgressToString(curr) + slog.Debug("Download progress", slog.String("download_progress", data)) + // TODO: add termination + _ = yield(update.NewDataEvent(update.UpgradeLineEvent, data)) + } + taskProgressCB := func(msg *rpc.TaskProgress) { + data := helpers.ArduinoCLITaskProgressToString(msg) + slog.Debug("Task progress", slog.String("task_progress", data)) + // TODO: add termination + _ = yield(update.NewDataEvent(update.UpgradeLineEvent, data)) + } + + if !yield(update.NewDataEvent(update.StartEvent, "Upgrade is starting")) { + return + } logrus.SetLevel(logrus.ErrorLevel) // Reduce the log level of arduino-cli srv := commands.NewArduinoCoreServer() if err := setConfig(ctx, srv); err != nil { - eventsCh <- update.NewErrorEvent(fmt.Errorf("error setting config: %w", err)) + _ = yield(update.NewErrorEvent(fmt.Errorf("error setting config: %w", err))) return } var inst *rpc.Instance if resp, err := srv.Create(ctx, &rpc.CreateRequest{}); err != nil { - eventsCh <- update.NewErrorEvent(fmt.Errorf("error creating arduino-cli instance: %w", err)) + _ = yield(update.NewErrorEvent(fmt.Errorf("error creating arduino-cli instance: %w", err))) return } else { inst = resp.GetInstance() @@ -174,11 +177,11 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st { stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, downloadProgressCB) if err := srv.UpdateIndex(&rpc.UpdateIndexRequest{Instance: inst}, stream); err != nil { - eventsCh <- update.NewErrorEvent(fmt.Errorf("error updating index: %w", err)) + _ = yield(update.NewErrorEvent(fmt.Errorf("error updating index: %w", err))) return } if err := srv.Init(&rpc.InitRequest{Instance: inst}, commands.InitStreamResponseToCallbackFunction(ctx, nil)); err != nil { - eventsCh <- update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err)) + _ = yield(update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err))) return } } @@ -200,13 +203,13 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st ); err != nil { var alreadyPresent *cmderrors.PlatformAlreadyAtTheLatestVersionError if errors.As(err, &alreadyPresent) { - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, alreadyPresent.Error()) + _ = yield(update.NewDataEvent(update.UpgradeLineEvent, alreadyPresent.Error())) return } var notFound *cmderrors.PlatformNotFoundError if !errors.As(err, ¬Found) { - eventsCh <- update.NewErrorEvent(fmt.Errorf("error upgrading platform: %w", err)) + _ = yield(update.NewErrorEvent(fmt.Errorf("error upgrading platform: %w", err))) return } // If the platform is not found, we will try to install it @@ -223,16 +226,17 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st ), ) if err != nil { - eventsCh <- update.NewErrorEvent(fmt.Errorf("error installing platform: %w", err)) + _ = yield(update.NewErrorEvent(fmt.Errorf("error installing platform: %w", err))) return } } else if respCB().GetPlatform() == nil { - eventsCh <- update.NewErrorEvent(fmt.Errorf("platform upgrade failed")) + _ = yield(update.NewErrorEvent(fmt.Errorf("platform upgrade failed"))) return } cbw := orchestrator.NewCallbackWriter(func(line string) { - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line) + // TODO: add termination + _ = yield(update.NewDataEvent(update.UpgradeLineEvent, line)) }) err := srv.BurnBootloader( @@ -244,10 +248,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st commands.BurnBootloaderToServerStreams(ctx, cbw, cbw), ) if err != nil { - eventsCh <- update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err)) + _ = yield(update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err))) return } - }() - - return eventsCh, nil + }, nil } diff --git a/internal/update/update.go b/internal/update/update.go index 7e3bc99c..feffe7a1 100644 --- a/internal/update/update.go +++ b/internal/update/update.go @@ -18,6 +18,7 @@ package update import ( "context" "fmt" + "iter" "log/slog" "net/http" "strings" @@ -46,7 +47,7 @@ type UpgradablePackage struct { type ServiceUpdater interface { ListUpgradablePackages(ctx context.Context, matcher func(UpgradablePackage) bool) ([]UpgradablePackage, error) - UpgradePackages(ctx context.Context, names []string) (<-chan Event, error) + UpgradePackages(ctx context.Context, names []string) (iter.Seq[Event], error) } type Manager struct {