Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 37 additions & 29 deletions internal/update/apt/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,70 +73,78 @@ func (s *Service) ListUpgradablePackages(ctx context.Context, matcher func(updat
// UpgradePackages upgrades the specified packages using the `apt-get upgrade` command.
// It publishes events to subscribers during the upgrade process.
// It returns an error if the upgrade is already in progress or if the upgrade command fails.
func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) {
func (s *Service) UpgradePackages(ctx context.Context, names []string) (iter.Seq[update.Event], error) {
if !s.lock.TryLock() {
return nil, update.ErrOperationAlreadyInProgress
}
eventsCh := make(chan update.Event, 100)

go func() {
return func(yield func(update.Event) bool) {
defer s.lock.Unlock()
defer close(eventsCh)

eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting")
stream := runUpgradeCommand(ctx, names)
for line, err := range stream {
if !yield(update.NewDataEvent(update.StartEvent, "Upgrade is starting")) {
return
}
for line, err := range runUpgradeCommand(ctx, names) {
if err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error running upgrade command: %w", err))
_ = yield(update.NewErrorEvent(fmt.Errorf("error running upgrade command: %w", err)))
return
}
if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) {
return
}
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
}

eventsCh <- update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting")
if !yield(update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting")) {
return
}
for line, err := range runAptCleanCommand(ctx) {
if err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err))
_ = yield(update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err)))
return
}
if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) {
return
}
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
}

eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ....")
streamCleanup := cleanupDockerContainers(ctx)
for line, err := range streamCleanup {
if !yield(update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ....")) {
return
}
for line, err := range cleanupDockerContainers(ctx) {
if err != nil {
// TODO: maybe we should retun an error or a better feedback to the user?
// currently, we just log the error and continue considenring not blocking
slog.Warn("Error stopping and destroying docker containers", "error", err)
} else {
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
} else if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) {
return
}
}

// TODO: Remove this workaround once docker image versions are no longer hardcoded in arduino-app-cli.
// Tracking issue: https://github.com/arduino/arduino-app-cli/issues/600
// Currently, we need to launch `arduino-app-cli system init` to pull the latest docker images because
// the version of the docker images are hardcoded in the (new downloaded) version of the arduino-app-cli.
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images ...")
streamDocker := pullDockerImages(ctx)
for line, err := range streamDocker {
if !yield(update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images ...")) {
return
}
for line, err := range pullDockerImages(ctx) {
if err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error pulling docker images: %w", err))
_ = yield(update.NewErrorEvent(fmt.Errorf("error pulling docker images: %w", err)))
return
}
if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) {
return
}
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
}
eventsCh <- update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ...")

err := restartServices(ctx)
if err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err))
if !yield(update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ...")) {
return
}
}()

return eventsCh, nil
if err := restartServices(ctx); err != nil {
_ = yield(update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err)))
return
}
}, nil
}

// runDpkgConfigureCommand is need in case an upgrade was interrupted in the middle
Expand Down
60 changes: 31 additions & 29 deletions internal/update/arduino/arduino.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"errors"
"fmt"
"iter"
"log/slog"
"sync"

Expand Down Expand Up @@ -125,40 +126,42 @@ func (a *ArduinoPlatformUpdater) ListUpgradablePackages(ctx context.Context, _ f
}

// UpgradePackages implements ServiceUpdater.
func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) {
func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string) (iter.Seq[update.Event], error) {
if !a.lock.TryLock() {
return nil, update.ErrOperationAlreadyInProgress
}
eventsCh := make(chan update.Event, 100)

downloadProgressCB := func(curr *rpc.DownloadProgress) {
data := helpers.ArduinoCLIDownloadProgressToString(curr)
slog.Debug("Download progress", slog.String("download_progress", data))
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data)
}
taskProgressCB := func(msg *rpc.TaskProgress) {
data := helpers.ArduinoCLITaskProgressToString(msg)
slog.Debug("Task progress", slog.String("task_progress", data))
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data)
}

go func() {
return func(yield func(update.Event) bool) {
defer a.lock.Unlock()
defer close(eventsCh)

eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting")
downloadProgressCB := func(curr *rpc.DownloadProgress) {
data := helpers.ArduinoCLIDownloadProgressToString(curr)
slog.Debug("Download progress", slog.String("download_progress", data))
// TODO: add termination
_ = yield(update.NewDataEvent(update.UpgradeLineEvent, data))
}
taskProgressCB := func(msg *rpc.TaskProgress) {
data := helpers.ArduinoCLITaskProgressToString(msg)
slog.Debug("Task progress", slog.String("task_progress", data))
// TODO: add termination
_ = yield(update.NewDataEvent(update.UpgradeLineEvent, data))
}

if !yield(update.NewDataEvent(update.StartEvent, "Upgrade is starting")) {
return
}

logrus.SetLevel(logrus.ErrorLevel) // Reduce the log level of arduino-cli
srv := commands.NewArduinoCoreServer()

if err := setConfig(ctx, srv); err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error setting config: %w", err))
_ = yield(update.NewErrorEvent(fmt.Errorf("error setting config: %w", err)))
return
}

var inst *rpc.Instance
if resp, err := srv.Create(ctx, &rpc.CreateRequest{}); err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error creating arduino-cli instance: %w", err))
_ = yield(update.NewErrorEvent(fmt.Errorf("error creating arduino-cli instance: %w", err)))
return
} else {
inst = resp.GetInstance()
Expand All @@ -174,11 +177,11 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
{
stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, downloadProgressCB)
if err := srv.UpdateIndex(&rpc.UpdateIndexRequest{Instance: inst}, stream); err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error updating index: %w", err))
_ = yield(update.NewErrorEvent(fmt.Errorf("error updating index: %w", err)))
return
}
if err := srv.Init(&rpc.InitRequest{Instance: inst}, commands.InitStreamResponseToCallbackFunction(ctx, nil)); err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err))
_ = yield(update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err)))
return
}
}
Expand All @@ -200,13 +203,13 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
); err != nil {
var alreadyPresent *cmderrors.PlatformAlreadyAtTheLatestVersionError
if errors.As(err, &alreadyPresent) {
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, alreadyPresent.Error())
_ = yield(update.NewDataEvent(update.UpgradeLineEvent, alreadyPresent.Error()))
return
}

var notFound *cmderrors.PlatformNotFoundError
if !errors.As(err, &notFound) {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error upgrading platform: %w", err))
_ = yield(update.NewErrorEvent(fmt.Errorf("error upgrading platform: %w", err)))
return
}
// If the platform is not found, we will try to install it
Expand All @@ -223,16 +226,17 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
),
)
if err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error installing platform: %w", err))
_ = yield(update.NewErrorEvent(fmt.Errorf("error installing platform: %w", err)))
return
}
} else if respCB().GetPlatform() == nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("platform upgrade failed"))
_ = yield(update.NewErrorEvent(fmt.Errorf("platform upgrade failed")))
return
}

cbw := orchestrator.NewCallbackWriter(func(line string) {
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
// TODO: add termination
_ = yield(update.NewDataEvent(update.UpgradeLineEvent, line))
})

err := srv.BurnBootloader(
Expand All @@ -244,10 +248,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
commands.BurnBootloaderToServerStreams(ctx, cbw, cbw),
)
if err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err))
_ = yield(update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err)))
return
}
}()

return eventsCh, nil
}, nil
}
3 changes: 2 additions & 1 deletion internal/update/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package update
import (
"context"
"fmt"
"iter"
"log/slog"
"net/http"
"strings"
Expand Down Expand Up @@ -46,7 +47,7 @@ type UpgradablePackage struct {

type ServiceUpdater interface {
ListUpgradablePackages(ctx context.Context, matcher func(UpgradablePackage) bool) ([]UpgradablePackage, error)
UpgradePackages(ctx context.Context, names []string) (<-chan Event, error)
UpgradePackages(ctx context.Context, names []string) (iter.Seq[Event], error)
}

type Manager struct {
Expand Down