Skip to content

Commit 8a07412

Browse files
authored
Merge pull request #1342 from yp969803/issue1341
fix: values is not updated in tcp_conn map #1341
2 parents 1844e9e + 87c6e6b commit 8a07412

File tree

3 files changed

+111
-15
lines changed

3 files changed

+111
-15
lines changed

pkg/controller/telemetry/metric.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -516,19 +516,19 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {
516516
OutputAccesslog(data, tcpConns[data.conSrcDstInfo], accesslog)
517517
}
518518

519-
if data.state == TCP_CLOSTED {
520-
delete(tcpConns, data.conSrcDstInfo)
521-
}
522-
523519
m.mutex.Lock()
524520
if m.EnableWorkloadMetric.Load() {
525-
m.updateWorkloadMetricCache(data, workloadLabels, tcpConns)
521+
m.updateWorkloadMetricCache(data, workloadLabels, tcpConns[data.conSrcDstInfo])
526522
}
527-
m.updateServiceMetricCache(data, serviceLabels, tcpConns)
523+
m.updateServiceMetricCache(data, serviceLabels, tcpConns[data.conSrcDstInfo])
528524
if m.EnableConnectionMetric.Load() && data.duration > LONG_CONN_METRIC_THRESHOLD {
529525
m.updateConnectionMetricCache(data, tcpConns[data.conSrcDstInfo], connectionLabels)
530526
}
531527
m.mutex.Unlock()
528+
529+
if data.state == TCP_CLOSTED {
530+
delete(tcpConns, data.conSrcDstInfo)
531+
}
532532
}
533533
}
534534
}
@@ -577,6 +577,7 @@ func buildV4Metric(buf *bytes.Buffer, tcpConns map[connectionSrcDst]connMetric)
577577
cm.totalRetrans = connectData.statistics.Retransmits
578578
cm.packetLost = connectData.statistics.LostPackets
579579
cm.totalReports++
580+
tcpConns[data.conSrcDstInfo] = cm
580581
} else {
581582
tcpConns[data.conSrcDstInfo] = connMetric{
582583
receivedBytes: connectData.ReceivedBytes,
@@ -632,6 +633,7 @@ func buildV6Metric(buf *bytes.Buffer, tcpConns map[connectionSrcDst]connMetric)
632633
cm.totalRetrans = connectData.statistics.Retransmits
633634
cm.packetLost = connectData.statistics.LostPackets
634635
cm.totalReports++
636+
tcpConns[data.conSrcDstInfo] = cm
635637
} else {
636638
tcpConns[data.conSrcDstInfo] = connMetric{
637639
receivedBytes: connectData.ReceivedBytes,
@@ -840,10 +842,10 @@ func buildPrincipal(workload *workloadapi.Workload) string {
840842
return DEFAULT_UNKNOWN
841843
}
842844

843-
func (m *MetricController) updateWorkloadMetricCache(data requestMetric, labels workloadMetricLabels, tcpConns map[connectionSrcDst]connMetric) {
845+
func (m *MetricController) updateWorkloadMetricCache(data requestMetric, labels workloadMetricLabels, metric connMetric) {
844846
v, ok := m.workloadMetricCache[labels]
845847
if ok {
846-
if data.state == TCP_ESTABLISHED && tcpConns[data.conSrcDstInfo].totalReports == 1 {
848+
if data.state == TCP_ESTABLISHED && metric.totalReports == 1 {
847849
v.WorkloadConnOpened = v.WorkloadConnOpened + 1
848850
}
849851
if data.state == TCP_CLOSTED {
@@ -858,7 +860,7 @@ func (m *MetricController) updateWorkloadMetricCache(data requestMetric, labels
858860
v.WorkloadConnPacketLost = v.WorkloadConnPacketLost + float64(data.packetLost)
859861
} else {
860862
newWorkloadMetricInfo := workloadMetricInfo{}
861-
if data.state == TCP_ESTABLISHED && tcpConns[data.conSrcDstInfo].totalReports == 1 {
863+
if data.state == TCP_ESTABLISHED && metric.totalReports == 1 {
862864
newWorkloadMetricInfo.WorkloadConnOpened = 1
863865
}
864866
if data.state == TCP_CLOSTED {
@@ -875,10 +877,10 @@ func (m *MetricController) updateWorkloadMetricCache(data requestMetric, labels
875877
}
876878
}
877879

878-
func (m *MetricController) updateServiceMetricCache(data requestMetric, labels serviceMetricLabels, tcpConns map[connectionSrcDst]connMetric) {
880+
func (m *MetricController) updateServiceMetricCache(data requestMetric, labels serviceMetricLabels, metric connMetric) {
879881
v, ok := m.serviceMetricCache[labels]
880882
if ok {
881-
if data.state == TCP_ESTABLISHED && tcpConns[data.conSrcDstInfo].totalReports == 1 {
883+
if data.state == TCP_ESTABLISHED && metric.totalReports == 1 {
882884
v.ServiceConnOpened = v.ServiceConnOpened + 1
883885
}
884886
if data.state == TCP_CLOSTED {
@@ -891,7 +893,7 @@ func (m *MetricController) updateServiceMetricCache(data requestMetric, labels s
891893
v.ServiceConnSentBytes = v.ServiceConnSentBytes + float64(data.sentBytes)
892894
} else {
893895
newServiceMetricInfo := serviceMetricInfo{}
894-
if data.state == TCP_ESTABLISHED && tcpConns[data.conSrcDstInfo].totalReports == 1 {
896+
if data.state == TCP_ESTABLISHED && metric.totalReports == 1 {
895897
newServiceMetricInfo.ServiceConnOpened = 1
896898
}
897899
if data.state == TCP_CLOSTED {

pkg/controller/telemetry/metric_test.go

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package telemetry
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"net"
2223
"reflect"
@@ -306,7 +307,7 @@ func TestBuildMetricsToPrometheus(t *testing.T) {
306307
workloadMetricCache: map[workloadMetricLabels]*workloadMetricInfo{},
307308
serviceMetricCache: map[serviceMetricLabels]*serviceMetricInfo{},
308309
}
309-
m.updateWorkloadMetricCache(tt.args.data, tt.args.labels, tt.args.tcpConns)
310+
m.updateWorkloadMetricCache(tt.args.data, tt.args.labels, tt.args.tcpConns[tt.args.data.conSrcDstInfo])
310311
assert.Equal(t, m.workloadMetricCache[tt.args.labels].WorkloadConnClosed, tt.want[0])
311312
assert.Equal(t, m.workloadMetricCache[tt.args.labels].WorkloadConnOpened, tt.want[1])
312313
assert.Equal(t, m.workloadMetricCache[tt.args.labels].WorkloadConnReceivedBytes, tt.want[2])
@@ -440,7 +441,7 @@ func TestBuildServiceMetricsToPrometheus(t *testing.T) {
440441
workloadMetricCache: map[workloadMetricLabels]*workloadMetricInfo{},
441442
serviceMetricCache: map[serviceMetricLabels]*serviceMetricInfo{},
442443
}
443-
m.updateServiceMetricCache(tt.args.data, tt.args.labels, tt.args.tcpConns)
444+
m.updateServiceMetricCache(tt.args.data, tt.args.labels, tt.args.tcpConns[tt.args.data.conSrcDstInfo])
444445
assert.Equal(t, m.serviceMetricCache[tt.args.labels].ServiceConnClosed, tt.want[0])
445446
assert.Equal(t, m.serviceMetricCache[tt.args.labels].ServiceConnOpened, tt.want[1])
446447
assert.Equal(t, m.serviceMetricCache[tt.args.labels].ServiceConnReceivedBytes, tt.want[2])
@@ -1677,3 +1678,96 @@ func TestMetricController_updatePrometheusMetric(t *testing.T) {
16771678
})
16781679
}
16791680
}
1681+
1682+
func TestBuildV4Metric(t *testing.T) {
1683+
buff := bytes.NewBuffer([]byte{10, 244, 1, 13, 10, 244, 1, 12, 34, 208, 144, 31, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1684+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 96, 46, 224, 144, 31, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1685+
0, 3, 0, 0, 0, 147, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 167, 122, 203, 84, 2, 0, 0, 0, 153, 163,
1686+
210, 202, 232, 184, 0, 0, 64, 30, 158, 31, 235, 184, 0, 0, 0, 0, 0, 0, 150, 158, 0, 0, 19, 0, 0, 0, 0, 0,
1687+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0})
1688+
data := requestMetric{
1689+
conSrcDstInfo: connectionSrcDst{
1690+
src: [4]uint32{218231818, 0, 0, 0},
1691+
dst: [4]uint32{201454602, 0, 0, 0},
1692+
srcPort: 53282,
1693+
dstPort: 8080,
1694+
},
1695+
origDstAddr: [4]uint32{3761135626, 0, 0, 0},
1696+
origDstPort: 8080,
1697+
direction: 2,
1698+
receivedBytes: 147,
1699+
sentBytes: 3,
1700+
state: 1,
1701+
success: 1,
1702+
duration: 10012555943,
1703+
startTime: 203309974725529,
1704+
lastReportTime: 203319987281472,
1705+
srtt: 40598,
1706+
minRtt: 19,
1707+
totalRetrans: 0,
1708+
packetLost: 0,
1709+
}
1710+
1711+
tests := []struct {
1712+
name string
1713+
tcpConns map[connectionSrcDst]connMetric
1714+
newConnMetric connMetric
1715+
}{
1716+
{
1717+
name: "v4 metric test, tcpConn is empty",
1718+
tcpConns: map[connectionSrcDst]connMetric{},
1719+
newConnMetric: connMetric{
1720+
receivedBytes: 147,
1721+
sentBytes: 3,
1722+
packetLost: 0,
1723+
totalRetrans: 0,
1724+
totalReports: 1,
1725+
},
1726+
},
1727+
{
1728+
name: "v4 metric test, tcpConns in not empty",
1729+
tcpConns: map[connectionSrcDst]connMetric{
1730+
data.conSrcDstInfo: {
1731+
receivedBytes: 1,
1732+
sentBytes: 1,
1733+
packetLost: 0,
1734+
totalRetrans: 0,
1735+
totalReports: 1,
1736+
},
1737+
},
1738+
newConnMetric: connMetric{
1739+
receivedBytes: 147,
1740+
sentBytes: 3,
1741+
packetLost: 0,
1742+
totalRetrans: 0,
1743+
totalReports: 2,
1744+
},
1745+
},
1746+
}
1747+
1748+
for _, tt := range tests {
1749+
t.Run(tt.name, func(t *testing.T) {
1750+
copiedBytes := make([]byte, len(buff.Bytes()))
1751+
copy(copiedBytes, buff.Bytes())
1752+
1753+
copiedBuff := bytes.NewBuffer(copiedBytes)
1754+
prevTcpConns := make(map[connectionSrcDst]connMetric)
1755+
for k, v := range tt.tcpConns {
1756+
prevTcpConns[k] = v
1757+
}
1758+
1759+
got, err := buildV4Metric(copiedBuff, tt.tcpConns)
1760+
if err != nil {
1761+
t.Errorf("buildV4Metric() error = %v", err)
1762+
}
1763+
assert.Equal(t, tt.newConnMetric, tt.tcpConns[data.conSrcDstInfo])
1764+
1765+
temp := data
1766+
temp.sentBytes = temp.sentBytes - prevTcpConns[temp.conSrcDstInfo].sentBytes
1767+
temp.receivedBytes = temp.receivedBytes - prevTcpConns[temp.conSrcDstInfo].receivedBytes
1768+
temp.packetLost = temp.packetLost - prevTcpConns[temp.conSrcDstInfo].packetLost
1769+
temp.totalRetrans = temp.totalRetrans - prevTcpConns[temp.conSrcDstInfo].totalRetrans
1770+
assert.Equal(t, temp, got)
1771+
})
1772+
}
1773+
}

test/e2e/baseline_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,7 @@ func buildL4Query(src, dst echo.Instance) prometheus.Query {
849849
"source_cluster": "Kubernetes",
850850
}
851851

852-
query.Metric = "kmesh_tcp_connections_opened_total"
852+
query.Metric = "kmesh_tcp_sent_bytes_total"
853853
query.Labels = labels
854854

855855
return query

0 commit comments

Comments
 (0)