Skip to content

Commit b61ad2b

Browse files
committed
Implement ClusterLogCollector using gcloud cli
1 parent 9455b34 commit b61ad2b

File tree

2 files changed

+172
-2
lines changed

2 files changed

+172
-2
lines changed

test/e2e/suite_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/klog/v2"
3535
infrav1 "sigs.k8s.io/cluster-api-provider-gcp/api/v1beta1"
3636
infrav1exp "sigs.k8s.io/cluster-api-provider-gcp/exp/api/v1beta1"
37+
"sigs.k8s.io/cluster-api-provider-gcp/util/log"
3738
capi_e2e "sigs.k8s.io/cluster-api/test/e2e"
3839
"sigs.k8s.io/cluster-api/test/framework"
3940
"sigs.k8s.io/cluster-api/test/framework/bootstrap"
@@ -174,7 +175,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
174175
kubeconfigPath := parts[3]
175176

176177
e2eConfig = loadE2EConfig(configPath)
177-
bootstrapClusterProxy = framework.NewClusterProxy("bootstrap", kubeconfigPath, initScheme())
178+
bootstrapClusterProxy = framework.NewClusterProxy("bootstrap", kubeconfigPath, initScheme(), framework.WithMachineLogCollector(&log.MachineLogCollector{}))
178179
})
179180

180181
// Using a SynchronizedAfterSuite for controlling how to delete resources shared across ParallelNodes (~ginkgo threads).
@@ -248,7 +249,7 @@ func setupBootstrapCluster(config *clusterctl.E2EConfig, scheme *runtime.Scheme,
248249
Expect(kubeconfigPath).To(BeAnExistingFile(), "Failed to get the kubeconfig file for the bootstrap cluster")
249250
}
250251

251-
clusterProxy := framework.NewClusterProxy("bootstrap", kubeconfigPath, scheme)
252+
clusterProxy := framework.NewClusterProxy("bootstrap", kubeconfigPath, scheme, framework.WithMachineLogCollector(&log.MachineLogCollector{}))
252253
Expect(clusterProxy).ToNot(BeNil(), "Failed to get a bootstrap cluster proxy")
253254

254255
return clusterProvider, clusterProxy

util/log/logcollector.go

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package log provides a log collector for machine and cluster logs.
18+
package log
19+
20+
import (
21+
"context"
22+
"io"
23+
"os"
24+
"os/exec"
25+
"path/filepath"
26+
"sort"
27+
"sync"
28+
29+
"github.com/pkg/errors"
30+
kerrors "k8s.io/apimachinery/pkg/util/errors"
31+
"k8s.io/klog/v2"
32+
infrav1 "sigs.k8s.io/cluster-api-provider-gcp/api/v1beta1"
33+
clusterv1beta1 "sigs.k8s.io/cluster-api/api/core/v1beta1"
34+
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
35+
"sigs.k8s.io/cluster-api/test/framework"
36+
"sigs.k8s.io/cluster-api/util"
37+
"sigs.k8s.io/controller-runtime/pkg/client"
38+
)
39+
40+
// MachineLogCollector implements the ClusterLogCollector interface.
41+
type MachineLogCollector struct{}
42+
43+
var _ framework.ClusterLogCollector = (*MachineLogCollector)(nil)
44+
45+
// CollectMachinePoolLog collects log from machine pools.
46+
func (c *MachineLogCollector) CollectMachinePoolLog(_ context.Context, _ client.Client, _ *clusterv1.MachinePool, _ string) error {
47+
return nil
48+
}
49+
50+
// CollectMachineLog collects log from machines.
51+
func (c *MachineLogCollector) CollectMachineLog(ctx context.Context, ctrlClient client.Client, m *clusterv1.Machine, outputPath string) error {
52+
gcpMachine := &infrav1.GCPMachine{}
53+
if err := ctrlClient.Get(ctx, client.ObjectKey{Namespace: m.Namespace, Name: m.Spec.InfrastructureRef.Name}, gcpMachine); err != nil {
54+
return errors.Wrapf(err, "getting GCPMachine %s", klog.KObj(m))
55+
}
56+
57+
cluster, err := util.GetClusterFromMetadata(ctx, ctrlClient, m.ObjectMeta)
58+
if err != nil {
59+
return errors.Wrap(err, "failed to get cluster from metadata")
60+
}
61+
62+
gcpCluster := &infrav1.GCPCluster{}
63+
if err := ctrlClient.Get(ctx, client.ObjectKey{Namespace: gcpMachine.Namespace, Name: cluster.Spec.InfrastructureRef.Name}, gcpCluster); err != nil {
64+
return errors.Wrapf(err, "getting GCPCluster %s", klog.KObj(gcpCluster))
65+
}
66+
67+
zone := zoneWithFallback(m, gcpCluster.Status.FailureDomains)
68+
69+
captureLogs := func(hostFileName, command string) func() error {
70+
return func() error {
71+
f, err := createOutputFile(filepath.Join(outputPath, hostFileName))
72+
if err != nil {
73+
return err
74+
}
75+
defer f.Close()
76+
77+
if err := executeRemoteCommand(f, gcpMachine.Name, zone, command); err != nil {
78+
return errors.Wrapf(err, "failed to run command %s for machine %s", command, klog.KObj(m))
79+
}
80+
81+
return nil
82+
}
83+
}
84+
85+
return aggregateConcurrent(
86+
captureLogs("kubelet.log",
87+
"sudo journalctl --no-pager --output=short-precise -u kubelet.service"),
88+
captureLogs("containerd.log",
89+
"sudo journalctl --no-pager --output=short-precise -u containerd.service"),
90+
captureLogs("cloud-init.log",
91+
"sudo cat /var/log/cloud-init.log"),
92+
captureLogs("cloud-init-output.log",
93+
"sudo cat /var/log/cloud-init-output.log"),
94+
captureLogs("kubeadm-service.log",
95+
"sudo cat /var/log/kubeadm-service.log"),
96+
captureLogs("pod-logs.tar.gz",
97+
"sudo tar -czf - -C /var/log pods"),
98+
)
99+
}
100+
101+
// CollectInfrastructureLogs collects log from the infrastructure.
102+
func (c *MachineLogCollector) CollectInfrastructureLogs(_ context.Context, _ client.Client, _ *clusterv1.Cluster, _ string) error {
103+
return nil
104+
}
105+
106+
func createOutputFile(path string) (*os.File, error) {
107+
if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil {
108+
return nil, err
109+
}
110+
111+
return os.Create(filepath.Clean(path))
112+
}
113+
114+
func executeRemoteCommand(f io.StringWriter, instanceName, zone, command string) error {
115+
cmd := exec.Command("gcloud", "compute", "ssh", "--zone", zone, "--command", command, instanceName)
116+
117+
cmd.Env = os.Environ()
118+
119+
outputBytes, err := cmd.CombinedOutput()
120+
if err != nil {
121+
return errors.Wrapf(err, "running command %q", command)
122+
}
123+
124+
_, err = f.WriteString(string(outputBytes))
125+
if err != nil {
126+
return errors.Wrap(err, "writing output to file")
127+
}
128+
129+
return nil
130+
}
131+
132+
// aggregateConcurrent runs fns concurrently, returning aggregated errors.
133+
func aggregateConcurrent(funcs ...func() error) error {
134+
// run all fns concurrently
135+
ch := make(chan error, len(funcs))
136+
var wg sync.WaitGroup
137+
for _, f := range funcs {
138+
wg.Add(1)
139+
go func() {
140+
defer wg.Done()
141+
ch <- f()
142+
}()
143+
}
144+
wg.Wait()
145+
close(ch)
146+
// collect up and return errors
147+
errs := []error{}
148+
for err := range ch {
149+
if err != nil {
150+
errs = append(errs, err)
151+
}
152+
}
153+
return kerrors.NewAggregate(errs)
154+
}
155+
156+
func zoneWithFallback(machine *clusterv1.Machine, gcpClusterFailureDomains clusterv1beta1.FailureDomains) string {
157+
if machine.Spec.FailureDomain == "" {
158+
fd := []string{}
159+
for failureDomainName := range gcpClusterFailureDomains {
160+
fd = append(fd, failureDomainName)
161+
}
162+
if len(fd) == 0 {
163+
return ""
164+
}
165+
sort.Strings(fd)
166+
return fd[0]
167+
}
168+
return machine.Spec.FailureDomain
169+
}

0 commit comments

Comments
 (0)