|
1 | 1 | package tasktesting |
2 | 2 |
|
3 | 3 | import ( |
| 4 | + "bufio" |
4 | 5 | "context" |
5 | 6 | "fmt" |
6 | | - "io" |
7 | | - "io/ioutil" |
8 | 7 | "log" |
9 | | - "time" |
10 | 8 |
|
11 | 9 | corev1 "k8s.io/api/core/v1" |
12 | 10 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
@@ -64,59 +62,32 @@ func streamContainerLogs( |
64 | 62 | return fmt.Errorf("error watching pods: %s", err) |
65 | 63 | } |
66 | 64 |
|
67 | | - containerState := "waiting" |
68 | | - var logStream io.ReadCloser |
69 | 65 | for { |
70 | | - select { |
71 | | - case ev := <-w.ResultChan(): |
72 | | - if cs, ok := containerFromEvent(ev, podName, containerName); ok { |
73 | | - if cs.State.Running != nil { |
74 | | - if containerState == "waiting" { |
75 | | - log.Printf("---------------------- Logs from %s -------------------------\n", containerName) |
76 | | - req := c.CoreV1().Pods(podNamespace).GetLogs(podName, &corev1.PodLogOptions{ |
77 | | - Follow: true, |
78 | | - Container: containerName, |
79 | | - }) |
80 | | - ls, err := req.Stream(context.Background()) |
81 | | - if err != nil { |
82 | | - return fmt.Errorf("could not create log stream for pod %s in namespace %s: %w", podName, podNamespace, err) |
83 | | - } |
84 | | - logStream = ls |
85 | | - defer logStream.Close() |
86 | | - } |
87 | | - containerState = "running" |
| 66 | + ev := <-w.ResultChan() |
| 67 | + if cs, ok := containerFromEvent(ev, podName, containerName); ok { |
| 68 | + if cs.State.Running != nil { |
| 69 | + log.Printf("---------------------- Logs from %s -------------------------\n", containerName) |
| 70 | + // Set up log stream using a new ctx so that it's not cancelled |
| 71 | + // when the task is done before all logs have been read. |
| 72 | + ls, err := c.CoreV1().Pods(podNamespace).GetLogs(podName, &corev1.PodLogOptions{ |
| 73 | + Follow: true, |
| 74 | + Container: containerName, |
| 75 | + }).Stream(context.Background()) |
| 76 | + if err != nil { |
| 77 | + return fmt.Errorf("could not create log stream for pod %s in namespace %s: %w", podName, podNamespace, err) |
88 | 78 | } |
89 | | - if containerState != "waiting" && cs.State.Terminated != nil { |
90 | | - // read reminder of the log stream |
91 | | - logs, err := ioutil.ReadAll(logStream) |
92 | | - if err != nil { |
93 | | - return fmt.Errorf("could not read log stream for pod %s in namespace %s: %w", podName, podNamespace, err) |
| 79 | + defer ls.Close() |
| 80 | + reader := bufio.NewScanner(ls) |
| 81 | + for reader.Scan() { |
| 82 | + select { |
| 83 | + case <-ctx.Done(): |
| 84 | + fmt.Println(reader.Text()) |
| 85 | + return nil |
| 86 | + default: |
| 87 | + fmt.Println(reader.Text()) |
94 | 88 | } |
95 | | - fmt.Println(string(logs)) |
96 | | - return nil |
97 | | - } |
98 | | - } |
99 | | - |
100 | | - default: |
101 | | - // if log stream has started, read some bytes |
102 | | - if logStream != nil { |
103 | | - buf := make([]byte, 100) |
104 | | - |
105 | | - numBytes, err := logStream.Read(buf) |
106 | | - if numBytes == 0 { |
107 | | - continue |
108 | | - } |
109 | | - if err == io.EOF { |
110 | | - log.Printf("logs for %s ended\n", containerName) |
111 | | - return nil |
112 | 89 | } |
113 | | - if err != nil { |
114 | | - return fmt.Errorf("error in reading log stream: %w", err) |
115 | | - } |
116 | | - |
117 | | - fmt.Print(string(buf[:numBytes])) |
118 | | - } else { |
119 | | - time.Sleep(time.Second) |
| 90 | + return reader.Err() |
120 | 91 | } |
121 | 92 | } |
122 | 93 | } |
|
0 commit comments