Skip to content

Commit 4f33510

Browse files
committed
Implement ClusterLogCollector using gcloud cli
1 parent 9455b34 commit 4f33510

File tree

2 files changed

+183
-2
lines changed

2 files changed

+183
-2
lines changed

test/e2e/suite_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/klog/v2"
3535
infrav1 "sigs.k8s.io/cluster-api-provider-gcp/api/v1beta1"
3636
infrav1exp "sigs.k8s.io/cluster-api-provider-gcp/exp/api/v1beta1"
37+
"sigs.k8s.io/cluster-api-provider-gcp/util/log"
3738
capi_e2e "sigs.k8s.io/cluster-api/test/e2e"
3839
"sigs.k8s.io/cluster-api/test/framework"
3940
"sigs.k8s.io/cluster-api/test/framework/bootstrap"
@@ -174,7 +175,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
174175
kubeconfigPath := parts[3]
175176

176177
e2eConfig = loadE2EConfig(configPath)
177-
bootstrapClusterProxy = framework.NewClusterProxy("bootstrap", kubeconfigPath, initScheme())
178+
bootstrapClusterProxy = framework.NewClusterProxy("bootstrap", kubeconfigPath, initScheme(), framework.WithMachineLogCollector(&log.MachineLogCollector{}))
178179
})
179180

180181
// Using a SynchronizedAfterSuite for controlling how to delete resources shared across ParallelNodes (~ginkgo threads).
@@ -248,7 +249,7 @@ func setupBootstrapCluster(config *clusterctl.E2EConfig, scheme *runtime.Scheme,
248249
Expect(kubeconfigPath).To(BeAnExistingFile(), "Failed to get the kubeconfig file for the bootstrap cluster")
249250
}
250251

251-
clusterProxy := framework.NewClusterProxy("bootstrap", kubeconfigPath, scheme)
252+
clusterProxy := framework.NewClusterProxy("bootstrap", kubeconfigPath, scheme, framework.WithMachineLogCollector(&log.MachineLogCollector{}))
252253
Expect(clusterProxy).ToNot(BeNil(), "Failed to get a bootstrap cluster proxy")
253254

254255
return clusterProvider, clusterProxy

util/log/logcollector.go

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package log provides a log collector for machine and cluster logs.
18+
package log
19+
20+
import (
21+
"context"
22+
"fmt"
23+
"io"
24+
"os"
25+
"os/exec"
26+
"path/filepath"
27+
"sort"
28+
"strings"
29+
"sync"
30+
31+
"github.com/pkg/errors"
32+
kerrors "k8s.io/apimachinery/pkg/util/errors"
33+
"k8s.io/klog/v2"
34+
infrav1 "sigs.k8s.io/cluster-api-provider-gcp/api/v1beta1"
35+
clusterv1beta1 "sigs.k8s.io/cluster-api/api/core/v1beta1"
36+
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
37+
"sigs.k8s.io/cluster-api/test/framework"
38+
"sigs.k8s.io/cluster-api/util"
39+
"sigs.k8s.io/controller-runtime/pkg/client"
40+
)
41+
42+
// MachineLogCollector implements the ClusterLogCollector interface.
43+
type MachineLogCollector struct{}
44+
45+
var _ framework.ClusterLogCollector = (*MachineLogCollector)(nil)
46+
47+
// CollectMachinePoolLog collects log from machine pools.
48+
func (c *MachineLogCollector) CollectMachinePoolLog(_ context.Context, _ client.Client, _ *clusterv1.MachinePool, _ string) error {
49+
return nil
50+
}
51+
52+
// CollectMachineLog collects log from machines.
53+
func (c *MachineLogCollector) CollectMachineLog(ctx context.Context, ctrlClient client.Client, m *clusterv1.Machine, outputPath string) error {
54+
gcpMachine := &infrav1.GCPMachine{}
55+
if err := ctrlClient.Get(ctx, client.ObjectKey{Namespace: m.Namespace, Name: m.Spec.InfrastructureRef.Name}, gcpMachine); err != nil {
56+
return errors.Wrapf(err, "getting GCPMachine %s", klog.KObj(m))
57+
}
58+
59+
cluster, err := util.GetClusterFromMetadata(ctx, ctrlClient, m.ObjectMeta)
60+
if err != nil {
61+
return errors.Wrap(err, "failed to get cluster from metadata")
62+
}
63+
64+
gcpCluster := &infrav1.GCPCluster{}
65+
if err := ctrlClient.Get(ctx, client.ObjectKey{Namespace: gcpMachine.Namespace, Name: cluster.Spec.InfrastructureRef.Name}, gcpCluster); err != nil {
66+
return errors.Wrapf(err, "getting GCPCluster %s", klog.KObj(gcpCluster))
67+
}
68+
69+
zone := zoneWithFallback(m, gcpCluster.Status.FailureDomains)
70+
71+
captureLogs := func(hostFileName, command string) func() error {
72+
return func() error {
73+
f, err := createOutputFile(filepath.Join(outputPath, hostFileName))
74+
if err != nil {
75+
return err
76+
}
77+
defer f.Close()
78+
79+
if err := executeRemoteCommand(f, gcpMachine.Name, zone, command); err != nil {
80+
return errors.Wrapf(err, "failed to run command %s for machine %s", command, klog.KObj(m))
81+
}
82+
83+
return nil
84+
}
85+
}
86+
87+
return aggregateConcurrent(
88+
captureLogs("kubelet.log",
89+
"sudo journalctl --no-pager --output=short-precise -u kubelet.service"),
90+
captureLogs("containerd.log",
91+
"sudo journalctl --no-pager --output=short-precise -u containerd.service"),
92+
captureLogs("cloud-init.log",
93+
"sudo cat /var/log/cloud-init.log"),
94+
captureLogs("cloud-init-output.log",
95+
"sudo cat /var/log/cloud-init-output.log"),
96+
captureLogs("kubeadm-service.log",
97+
"sudo cat /var/log/kubeadm-service.log"),
98+
captureLogs("pod-logs.tar.gz",
99+
"sudo tar -czf - -C /var/log pods"),
100+
)
101+
}
102+
103+
// CollectInfrastructureLogs collects log from the infrastructure.
104+
func (c *MachineLogCollector) CollectInfrastructureLogs(_ context.Context, _ client.Client, _ *clusterv1.Cluster, _ string) error {
105+
return nil
106+
}
107+
108+
func createOutputFile(path string) (*os.File, error) {
109+
if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil {
110+
return nil, err
111+
}
112+
113+
return os.Create(filepath.Clean(path))
114+
}
115+
116+
func executeRemoteCommand(f io.StringWriter, instanceName, zone, command string) error {
117+
cmd := exec.Command("gcloud", "compute", "ssh", "--zone", zone, "--command", fmt.Sprintf("%q", command), instanceName)
118+
119+
commandString := cmd.Path + " " + strings.Join(cmd.Args, " ")
120+
121+
cmd.Env = os.Environ()
122+
123+
errs := []error{}
124+
outputBytes, err := cmd.CombinedOutput()
125+
if err != nil {
126+
errs = append(errs, err)
127+
}
128+
129+
if outputBytes != nil {
130+
// Always write the output to the file, if any (so we get the error message)
131+
if _, err := f.WriteString(string(outputBytes)); err != nil {
132+
errs = append(errs, err)
133+
}
134+
}
135+
136+
if err := kerrors.NewAggregate(errs); err != nil {
137+
return errors.Wrapf(err, "running command %q", commandString)
138+
}
139+
140+
return nil
141+
}
142+
143+
// aggregateConcurrent runs fns concurrently, returning aggregated errors.
144+
func aggregateConcurrent(funcs ...func() error) error {
145+
// run all fns concurrently
146+
ch := make(chan error, len(funcs))
147+
var wg sync.WaitGroup
148+
for _, f := range funcs {
149+
wg.Add(1)
150+
go func() {
151+
defer wg.Done()
152+
ch <- f()
153+
}()
154+
}
155+
wg.Wait()
156+
close(ch)
157+
// collect up and return errors
158+
errs := []error{}
159+
for err := range ch {
160+
if err != nil {
161+
errs = append(errs, err)
162+
}
163+
}
164+
return kerrors.NewAggregate(errs)
165+
}
166+
167+
func zoneWithFallback(machine *clusterv1.Machine, gcpClusterFailureDomains clusterv1beta1.FailureDomains) string {
168+
if machine.Spec.FailureDomain == "" {
169+
fd := []string{}
170+
for failureDomainName := range gcpClusterFailureDomains {
171+
fd = append(fd, failureDomainName)
172+
}
173+
if len(fd) == 0 {
174+
return ""
175+
}
176+
sort.Strings(fd)
177+
return fd[0]
178+
}
179+
return machine.Spec.FailureDomain
180+
}

0 commit comments

Comments
 (0)