Skip to content

Commit 573eed8

Browse files
authored
Fluentd log aggregation (#259)
1 parent 8df86f2 commit 573eed8

File tree

9 files changed

+158
-249
lines changed

9 files changed

+158
-249
lines changed

examples/pipelines/iris/cortex.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,6 @@
2424
- kind: api
2525
name: iris-type
2626
model: @dnn
27+
compute:
28+
max_replicas: 4
29+
min_replicas: 4

images/fluentd/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
FROM fluent/fluentd-kubernetes-daemonset:v1.2.2-debian-cloudwatch
2+
RUN fluent-gem install fluent-plugin-grep
3+
RUN fluent-gem install fluent-plugin-route

manager/manifests/fluentd.yaml

Lines changed: 104 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,21 @@ metadata:
2020
labels:
2121
app: fluentd
2222
---
23-
24-
apiVersion: rbac.authorization.k8s.io/v1
25-
kind: Role
23+
apiVersion: rbac.authorization.k8s.io/v1beta1
24+
kind: ClusterRole
2625
metadata:
2726
name: fluentd
2827
namespace: $CORTEX_NAMESPACE
2928
rules:
3029
- apiGroups: [""]
31-
resources: [pods]
30+
resources:
31+
- namespaces
32+
- pods
33+
- pods/logs
3234
verbs: [get, list, watch]
3335
---
34-
35-
apiVersion: rbac.authorization.k8s.io/v1
36-
kind: RoleBinding
36+
apiVersion: rbac.authorization.k8s.io/v1beta1
37+
kind: ClusterRoleBinding
3738
metadata:
3839
name: fluentd
3940
namespace: $CORTEX_NAMESPACE
@@ -42,7 +43,7 @@ subjects:
4243
name: fluentd
4344
namespace: $CORTEX_NAMESPACE
4445
roleRef:
45-
kind: Role
46+
kind: ClusterRole
4647
name: fluentd
4748
apiGroup: rbac.authorization.k8s.io
4849
---
@@ -54,25 +55,111 @@ metadata:
5455
namespace: $CORTEX_NAMESPACE
5556
data:
5657
fluent.conf: |
58+
@include containers.conf
59+
5760
<match fluent.**>
5861
@type null
5962
</match>
63+
containers.conf: |
6064
<source>
6165
@type tail
62-
enable_stat_watcher false
66+
@id in_tail_container_logs
67+
@label @containers
6368
path /var/log/containers/**_${CORTEX_NAMESPACE}_**.log
6469
pos_file /var/log/fluentd-containers.log.pos
65-
time_format %Y-%m-%dT%H:%M:%S.%NZ
6670
tag *
67-
format json
6871
read_from_head true
72+
<parse>
73+
@type json
74+
time_format %Y-%m-%dT%H:%M:%S.%NZ
75+
</parse>
6976
</source>
70-
<match **>
71-
@type cloudwatch_logs
72-
log_group_name "#{ENV['LOG_GROUP_NAME']}"
73-
auto_create_stream true
74-
use_tag_as_stream true
75-
</match>
77+
78+
<label @containers>
79+
<filter **>
80+
@type kubernetes_metadata
81+
@id filter_kube_metadata
82+
</filter>
83+
84+
<match **>
85+
@type route
86+
<route **>
87+
copy
88+
@label @by_pod
89+
</route>
90+
<route **>
91+
copy
92+
@label @by_endpoint
93+
</route>
94+
</match>
95+
</label>
96+
97+
<label @by_pod>
98+
<filter **>
99+
@type record_transformer
100+
@id filter_containers_stream_transformer
101+
<record>
102+
stream_name ${tag_parts[3]}
103+
</record>
104+
remove_keys kubernetes,docker,stream
105+
</filter>
106+
<match **>
107+
@type cloudwatch_logs
108+
region "#{ENV['CORTEX_REGION']}"
109+
log_group_name "#{ENV['LOG_GROUP_NAME']}"
110+
log_stream_name_key stream_name
111+
remove_log_stream_name_key true
112+
auto_create_stream true
113+
<buffer>
114+
flush_interval 5
115+
chunk_limit_size 2m
116+
queued_chunks_limit_size 32
117+
retry_forever true
118+
</buffer>
119+
</match>
120+
</label>
121+
122+
<label @by_endpoint>
123+
<filter **>
124+
@type record_transformer
125+
enable_ruby
126+
<record>
127+
workload_type ${record.dig("kubernetes", "labels", "workloadType") || "unknown"}
128+
</record>
129+
remove_keys docker,stream
130+
</filter>
131+
132+
<filter **>
133+
@type grep
134+
regexp1 workload_type api
135+
</filter>
136+
<filter **>
137+
@type record_transformer
138+
enable_ruby
139+
<record>
140+
pod_name ${record.dig("kubernetes", "pod_name")}
141+
container_name ${record.dig("kubernetes", "container_name")}
142+
workload_id ${record.dig("kubernetes", "labels", "workloadID")}
143+
stream_name ${record.dig("kubernetes", "labels", "appName")}.${record.dig("kubernetes", "labels", "apiName")}
144+
</record>
145+
remove_keys kubernetes,api_name,app_name,workload_type
146+
</filter>
147+
<match **>
148+
@type cloudwatch_logs
149+
@id out_cloudwatch_logs_endpoints
150+
region "#{ENV['CORTEX_REGION']}"
151+
log_group_name "#{ENV['LOG_GROUP_NAME']}"
152+
log_stream_name_key stream_name
153+
remove_log_stream_name_key true
154+
auto_create_stream true
155+
<buffer>
156+
flush_interval 5
157+
chunk_limit_size 2m
158+
queued_chunks_limit_size 32
159+
retry_forever true
160+
</buffer>
161+
</match>
162+
</label>
76163
---
77164

78165
apiVersion: extensions/v1beta1

pkg/lib/aws/logs.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ type FluentdLog struct {
4444
}
4545

4646
func (c *Client) GetLogs(prefix string, logGroup string) (string, error) {
47-
logGroupNamePrefix := "var.log.containers."
48-
4947
ignoreLogStreamNameRegexes := []*regexp.Regexp{
5048
regexp.MustCompile(`-exec-[0-9]+`),
5149
regexp.MustCompile(`_spark-init-`),
@@ -55,7 +53,7 @@ func (c *Client) GetLogs(prefix string, logGroup string) (string, error) {
5553
logStreamsOut, err := c.cloudWatchLogsClient.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{
5654
Limit: aws.Int64(50),
5755
LogGroupName: aws.String(logGroup),
58-
LogStreamNamePrefix: aws.String(logGroupNamePrefix + prefix),
56+
LogStreamNamePrefix: aws.String(prefix),
5957
})
6058
if err != nil {
6159
return "", errors.Wrap(err, "cloudwatch logs", prefix)

pkg/operator/endpoints/logs.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/cortexlabs/cortex/pkg/lib/errors"
2525
"github.com/cortexlabs/cortex/pkg/lib/slices"
26+
reslib "github.com/cortexlabs/cortex/pkg/operator/api/resource"
2627
"github.com/cortexlabs/cortex/pkg/operator/workloads"
2728
)
2829

@@ -46,8 +47,14 @@ func ReadLogs(w http.ResponseWriter, r *http.Request) {
4647
resourceName := getOptionalQParam("resourceName", r)
4748
resourceType := getOptionalQParam("resourceType", r)
4849

50+
podLabels := map[string]string{
51+
"appName": appName,
52+
"userFacing": "true",
53+
}
54+
4955
if workloadID != "" {
50-
readLogs(w, r, workloadID, appName, verbose)
56+
podLabels["workloadID"] = workloadID
57+
readLogs(w, r, podLabels, appName, verbose)
5158
return
5259
}
5360

@@ -61,7 +68,9 @@ func ReadLogs(w http.ResponseWriter, r *http.Request) {
6168
RespondError(w, errors.Wrap(workloads.ErrorNotFound(), appName, "latest workload ID", resourceID))
6269
return
6370
}
64-
readLogs(w, r, workloadID, appName, verbose)
71+
72+
podLabels["workloadID"] = workloadID
73+
readLogs(w, r, podLabels, appName, verbose)
6574
return
6675
}
6776

@@ -76,16 +85,25 @@ func ReadLogs(w http.ResponseWriter, r *http.Request) {
7685
RespondError(w, err)
7786
return
7887
}
79-
workloadID = resource.GetWorkloadID()
80-
readLogs(w, r, workloadID, appName, verbose)
88+
if resource.GetResourceType() == reslib.APIType {
89+
podLabels["apiName"] = resource.GetName()
90+
} else {
91+
podLabels["workloadID"] = resource.GetWorkloadID()
92+
}
93+
readLogs(w, r, podLabels, appName, verbose)
8194
return
8295
}
8396

8497
resource, err := ctx.VisibleResourceByName(resourceName)
8598

8699
if err == nil {
87100
workloadID = resource.GetWorkloadID()
88-
readLogs(w, r, workloadID, appName, verbose)
101+
if resource.GetResourceType() == reslib.APIType {
102+
podLabels["apiName"] = resource.GetName()
103+
} else {
104+
podLabels["workloadID"] = resource.GetWorkloadID()
105+
}
106+
readLogs(w, r, podLabels, appName, verbose)
89107
return
90108
}
91109

@@ -96,16 +114,16 @@ func ReadLogs(w http.ResponseWriter, r *http.Request) {
96114
}
97115
workloadIDs = slices.UniqueStrings(workloadIDs)
98116
if len(workloadIDs) == 1 {
99-
workloadID = workloadIDs[0]
100-
readLogs(w, r, workloadID, appName, verbose)
117+
podLabels["workloadID"] = workloadIDs[0]
118+
readLogs(w, r, podLabels, appName, verbose)
101119
return
102120
}
103121

104122
RespondError(w, err)
105123
return
106124
}
107125

108-
func readLogs(w http.ResponseWriter, r *http.Request, workloadID string, appName string, verbose bool) {
126+
func readLogs(w http.ResponseWriter, r *http.Request, podLabels map[string]string, appName string, verbose bool) {
109127
upgrader := websocket.Upgrader{}
110128
socket, err := upgrader.Upgrade(w, r, nil)
111129
if err != nil {
@@ -114,5 +132,5 @@ func readLogs(w http.ResponseWriter, r *http.Request, workloadID string, appName
114132
}
115133
defer socket.Close()
116134

117-
workloads.ReadLogs(appName, workloadID, verbose, socket)
135+
workloads.ReadLogs(appName, podLabels, verbose, socket)
118136
}

pkg/operator/workloads/cron.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,6 @@ func runCron() {
7070
errors.PrintError(err)
7171
}
7272

73-
if err := uploadLogPrefixesFromAPIPods(apiPods); err != nil {
74-
config.Telemetry.ReportError(err)
75-
errors.PrintError(err)
76-
}
77-
7873
failedPods, err := config.Kubernetes.ListPods(&kmeta.ListOptions{
7974
FieldSelector: "status.phase=Failed",
8075
})

0 commit comments

Comments
 (0)