Skip to content

Commit f3da5fb

Browse files
committed
Implement ClusterLogCollector using gcloud cli
1 parent 9455b34 commit f3da5fb

File tree

2 files changed

+182
-2
lines changed

2 files changed

+182
-2
lines changed

test/e2e/suite_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/klog/v2"
3535
infrav1 "sigs.k8s.io/cluster-api-provider-gcp/api/v1beta1"
3636
infrav1exp "sigs.k8s.io/cluster-api-provider-gcp/exp/api/v1beta1"
37+
"sigs.k8s.io/cluster-api-provider-gcp/util/log"
3738
capi_e2e "sigs.k8s.io/cluster-api/test/e2e"
3839
"sigs.k8s.io/cluster-api/test/framework"
3940
"sigs.k8s.io/cluster-api/test/framework/bootstrap"
@@ -174,7 +175,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
174175
kubeconfigPath := parts[3]
175176

176177
e2eConfig = loadE2EConfig(configPath)
177-
bootstrapClusterProxy = framework.NewClusterProxy("bootstrap", kubeconfigPath, initScheme())
178+
bootstrapClusterProxy = framework.NewClusterProxy("bootstrap", kubeconfigPath, initScheme(), framework.WithMachineLogCollector(&log.MachineLogCollector{}))
178179
})
179180

180181
// Using a SynchronizedAfterSuite for controlling how to delete resources shared across ParallelNodes (~ginkgo threads).
@@ -248,7 +249,7 @@ func setupBootstrapCluster(config *clusterctl.E2EConfig, scheme *runtime.Scheme,
248249
Expect(kubeconfigPath).To(BeAnExistingFile(), "Failed to get the kubeconfig file for the bootstrap cluster")
249250
}
250251

251-
clusterProxy := framework.NewClusterProxy("bootstrap", kubeconfigPath, scheme)
252+
clusterProxy := framework.NewClusterProxy("bootstrap", kubeconfigPath, scheme, framework.WithMachineLogCollector(&log.MachineLogCollector{}))
252253
Expect(clusterProxy).ToNot(BeNil(), "Failed to get a bootstrap cluster proxy")
253254

254255
return clusterProvider, clusterProxy

util/log/logcollector.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package log provides a log collector for machine and cluster logs.
18+
package log
19+
20+
import (
21+
"context"
22+
"io"
23+
"os"
24+
"os/exec"
25+
"path/filepath"
26+
"sort"
27+
"strings"
28+
"sync"
29+
30+
"github.com/pkg/errors"
31+
kerrors "k8s.io/apimachinery/pkg/util/errors"
32+
"k8s.io/klog/v2"
33+
infrav1 "sigs.k8s.io/cluster-api-provider-gcp/api/v1beta1"
34+
clusterv1beta1 "sigs.k8s.io/cluster-api/api/core/v1beta1"
35+
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
36+
"sigs.k8s.io/cluster-api/test/framework"
37+
"sigs.k8s.io/cluster-api/util"
38+
"sigs.k8s.io/controller-runtime/pkg/client"
39+
)
40+
41+
// MachineLogCollector implements the ClusterLogCollector interface.
42+
type MachineLogCollector struct{}
43+
44+
var _ framework.ClusterLogCollector = (*MachineLogCollector)(nil)
45+
46+
// CollectMachinePoolLog collects log from machine pools.
47+
func (c *MachineLogCollector) CollectMachinePoolLog(_ context.Context, _ client.Client, _ *clusterv1.MachinePool, _ string) error {
48+
return nil
49+
}
50+
51+
// CollectMachineLog collects log from machines.
52+
func (c *MachineLogCollector) CollectMachineLog(ctx context.Context, ctrlClient client.Client, m *clusterv1.Machine, outputPath string) error {
53+
gcpMachine := &infrav1.GCPMachine{}
54+
if err := ctrlClient.Get(ctx, client.ObjectKey{Namespace: m.Namespace, Name: m.Spec.InfrastructureRef.Name}, gcpMachine); err != nil {
55+
return errors.Wrapf(err, "getting GCPMachine %s", klog.KObj(m))
56+
}
57+
58+
cluster, err := util.GetClusterFromMetadata(ctx, ctrlClient, m.ObjectMeta)
59+
if err != nil {
60+
return errors.Wrap(err, "failed to get cluster from metadata")
61+
}
62+
63+
gcpCluster := &infrav1.GCPCluster{}
64+
if err := ctrlClient.Get(ctx, client.ObjectKey{Namespace: gcpMachine.Namespace, Name: cluster.Spec.InfrastructureRef.Name}, gcpCluster); err != nil {
65+
return errors.Wrapf(err, "getting GCPCluster %s", klog.KObj(gcpCluster))
66+
}
67+
68+
zone := zoneWithFallback(m, gcpCluster.Status.FailureDomains)
69+
70+
captureLogs := func(hostFileName, command string) func() error {
71+
return func() error {
72+
f, err := createOutputFile(filepath.Join(outputPath, hostFileName))
73+
if err != nil {
74+
return err
75+
}
76+
defer f.Close()
77+
78+
if err := executeRemoteCommand(f, gcpMachine.Name, zone, command); err != nil {
79+
return errors.Wrapf(err, "failed to run command %s for machine %s", command, klog.KObj(m))
80+
}
81+
82+
return nil
83+
}
84+
}
85+
86+
return aggregateConcurrent(
87+
captureLogs("kubelet.log",
88+
"sudo journalctl --no-pager --output=short-precise -u kubelet.service"),
89+
captureLogs("containerd.log",
90+
"sudo journalctl --no-pager --output=short-precise -u containerd.service"),
91+
captureLogs("cloud-init.log",
92+
"sudo cat /var/log/cloud-init.log"),
93+
captureLogs("cloud-init-output.log",
94+
"sudo cat /var/log/cloud-init-output.log"),
95+
captureLogs("kubeadm-service.log",
96+
"sudo cat /var/log/kubeadm-service.log"),
97+
captureLogs("pod-logs.tar.gz",
98+
"sudo tar -czf - -C /var/log pods"),
99+
)
100+
}
101+
102+
// CollectInfrastructureLogs collects log from the infrastructure.
103+
func (c *MachineLogCollector) CollectInfrastructureLogs(_ context.Context, _ client.Client, _ *clusterv1.Cluster, _ string) error {
104+
return nil
105+
}
106+
107+
func createOutputFile(path string) (*os.File, error) {
108+
if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil {
109+
return nil, err
110+
}
111+
112+
return os.Create(filepath.Clean(path))
113+
}
114+
115+
func executeRemoteCommand(f io.StringWriter, instanceName, zone, command string) error {
116+
cmd := exec.Command("gcloud", "compute", "ssh", "--zone", zone, "--command", command, instanceName)
117+
118+
commandString := cmd.Path + " " + strings.Join(cmd.Args, " ")
119+
120+
cmd.Env = os.Environ()
121+
122+
errs := []error{}
123+
outputBytes, err := cmd.CombinedOutput()
124+
if err != nil {
125+
errs = append(errs, err)
126+
}
127+
128+
if outputBytes != nil {
129+
// Always write the output to the file, if any (so we get the error message)
130+
if _, err := f.WriteString(string(outputBytes)); err != nil {
131+
errs = append(errs, err)
132+
}
133+
}
134+
135+
if err := kerrors.NewAggregate(errs); err != nil {
136+
return errors.Wrapf(err, "running command %q", commandString)
137+
}
138+
139+
return nil
140+
}
141+
142+
// aggregateConcurrent runs fns concurrently, returning aggregated errors.
143+
func aggregateConcurrent(funcs ...func() error) error {
144+
// run all fns concurrently
145+
ch := make(chan error, len(funcs))
146+
var wg sync.WaitGroup
147+
for _, f := range funcs {
148+
wg.Add(1)
149+
go func() {
150+
defer wg.Done()
151+
ch <- f()
152+
}()
153+
}
154+
wg.Wait()
155+
close(ch)
156+
// collect up and return errors
157+
errs := []error{}
158+
for err := range ch {
159+
if err != nil {
160+
errs = append(errs, err)
161+
}
162+
}
163+
return kerrors.NewAggregate(errs)
164+
}
165+
166+
func zoneWithFallback(machine *clusterv1.Machine, gcpClusterFailureDomains clusterv1beta1.FailureDomains) string {
167+
if machine.Spec.FailureDomain == "" {
168+
fd := []string{}
169+
for failureDomainName := range gcpClusterFailureDomains {
170+
fd = append(fd, failureDomainName)
171+
}
172+
if len(fd) == 0 {
173+
return ""
174+
}
175+
sort.Strings(fd)
176+
return fd[0]
177+
}
178+
return machine.Spec.FailureDomain
179+
}

0 commit comments

Comments
 (0)