Skip to content

Commit 82e54bb

Browse files
author
George Vine
committed
BUG/MEDIUM: log: fix blocking on syslog target communication errors
1 parent 4d10854 commit 82e54bb

File tree

3 files changed

+82
-4
lines changed

3 files changed

+82
-4
lines changed

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ require (
3131
github.com/maruel/panicparse/v2 v2.3.1
3232
github.com/nathanaelle/syslog5424/v2 v2.0.5
3333
github.com/rs/cors v1.10.1
34+
github.com/rubyist/circuitbreaker v2.2.1+incompatible
3435
github.com/shirou/gopsutil v3.21.11+incompatible
3536
github.com/sirupsen/logrus v1.9.3
3637
github.com/stretchr/testify v1.8.4
@@ -51,7 +52,9 @@ require (
5152
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 // indirect
5253
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 // indirect
5354
github.com/aws/smithy-go v1.20.0 // indirect
55+
github.com/cenk/backoff v2.2.1+incompatible // indirect
5456
github.com/davecgh/go-spew v1.1.1 // indirect
57+
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
5558
github.com/go-ole/go-ole v1.3.0 // indirect
5659
github.com/go-openapi/analysis v0.22.2 // indirect
5760
github.com/go-openapi/jsonpointer v0.20.2 // indirect
@@ -72,6 +75,7 @@ require (
7275
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
7376
github.com/oklog/ulid v1.3.1 // indirect
7477
github.com/perimeterx/marshmallow v1.1.5 // indirect
78+
github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea // indirect
7579
github.com/pkg/errors v0.9.1 // indirect
7680
github.com/pmezard/go-difflib v1.0.0 // indirect
7781
github.com/rogpeppe/go-internal v1.12.0 // indirect

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 h1:cjTRjh700H36MQ8M0LnDn33W3Jmw
3232
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0/go.mod h1:nXfOBMWPokIbOY+Gi7a1psWMSvskUCemZzI+SMB7Akc=
3333
github.com/aws/smithy-go v1.20.0 h1:6+kZsCXZwKxZS9RfISnPc4EXlHoyAkm2hPuM8X2BrrQ=
3434
github.com/aws/smithy-go v1.20.0/go.mod h1:uo5RKksAl4PzhqaAbjd4rLgFoq5koTsQKYuGe7dklGc=
35+
github.com/cenk/backoff v2.2.1+incompatible h1:djdFT7f4gF2ttuzRKPbMOWgZajgesItGLwG5FTQKmmE=
36+
github.com/cenk/backoff v2.2.1+incompatible/go.mod h1:7FtoeaSnHoZnmZzz47cM35Y9nSW7tNyaidugnHTaFDE=
3537
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
3638
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3739
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -140,6 +142,8 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
140142
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
141143
github.com/perimeterx/marshmallow v1.1.5 h1:a2LALqQ1BlHM8PZblsDdidgv1mWi1DgC2UmX50IvK2s=
142144
github.com/perimeterx/marshmallow v1.1.5/go.mod h1:dsXbUu8CRzfYP5a87xpp0xq9S3u0Vchtcl8we9tYaXw=
145+
github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea h1:sKwxy1H95npauwu8vtF95vG/syrL0p8fSZo/XlDg5gk=
146+
github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea/go.mod h1:1VcHEd3ro4QMoHfiNl/j7Jkln9+KQuorp0PItHMJYNg=
143147
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
144148
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
145149
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -151,6 +155,8 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU
151155
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
152156
github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo=
153157
github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
158+
github.com/rubyist/circuitbreaker v2.2.1+incompatible h1:KUKd/pV8Geg77+8LNDwdow6rVCAYOp8+kHUyFvL6Mhk=
159+
github.com/rubyist/circuitbreaker v2.2.1+incompatible/go.mod h1:Ycs3JgJADPuzJDwffe12k6BZT8hxVi6lFK+gWYJLN4A=
154160
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
155161
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
156162
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=

log/rfc5424.go

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,21 @@ package log
1818
import (
1919
"fmt"
2020
"strings"
21+
"time"
2122

2223
"github.com/nathanaelle/syslog5424/v2"
24+
circuit "github.com/rubyist/circuitbreaker"
2325
"github.com/sirupsen/logrus"
2426
)
2527

2628
type RFC5424Hook struct {
2729
syslog *syslog5424.Syslog
2830
sender *syslog5424.Sender
2931
msgID string
32+
33+
// Use a circuit breaker to pause sending messages to the syslog target
34+
// in the presence of connection errors.
35+
cb *circuit.Breaker
3036
}
3137

3238
func (r RFC5424Hook) Levels() []logrus.Level {
@@ -58,7 +64,13 @@ func (r RFC5424Hook) Fire(entry *logrus.Entry) (err error) {
5864

5965
msg := strings.Join(messages, " ")
6066

61-
r.syslog.Channel(sev).Msgid(r.msgID).Log(msg)
67+
// Do not perform any action unless the circuit breaker is either closed (reset), or is ready to retry.
68+
if r.cb.Ready() {
69+
r.syslog.Channel(sev).Msgid(r.msgID).Log(msg)
70+
// Register any call as successful to enable automatic resets.
71+
// Failures are registered asynchronously by the goroutine that consumes errors from the corresponding channel.
72+
r.cb.Success()
73+
}
6274

6375
return
6476
}
@@ -74,7 +86,9 @@ func NewRFC5424Hook(opts Target) (logrus.Hook, error) {
7486
return nil, err
7587
}
7688

77-
slConn, _, err := syslog5424.Dial(opts.SyslogProto, opts.SyslogAddr)
89+
// syslog5424.Dial() returns an error channel, which needs to be drained
90+
// in order to avoid blocking.
91+
slConn, errCh, err := syslog5424.Dial(opts.SyslogProto, opts.SyslogAddr)
7892
if err != nil {
7993
return nil, err
8094
}
@@ -84,10 +98,64 @@ func NewRFC5424Hook(opts Target) (logrus.Hook, error) {
8498
return nil, err
8599
}
86100

87-
return &RFC5424Hook{syslog: syslogServer, sender: slConn, msgID: opts.SyslogMsgID}, nil
101+
r := &RFC5424Hook{
102+
syslog: syslogServer, sender: slConn, msgID: opts.SyslogMsgID,
103+
// We can change the circuit breaker settings as desired - including making
104+
// them configurable and/or dynamically adjustable based on runtime conditions.
105+
//
106+
// Please note, however, that a 3-failure threshold breaker with default settings
107+
// was found to work well with varying load and different states of a log target.
108+
// Specifically, the breaker will remain tripped when sending messages to the target
109+
// that is consistently failing, and will reset quickly when delivery begins to succeed.
110+
cb: circuit.NewThresholdBreaker(3),
111+
}
112+
113+
// A signal channel that is used to stop the goroutine reporting on circuit breaker state changes.
114+
doneCh := make(chan struct{})
115+
116+
// Consume errors from errCh until it is closed.
117+
go func() {
118+
for {
119+
err, ok := <-errCh
120+
if err != nil {
121+
r.cb.Fail() // Register a failure with the circuit breaker.
122+
}
123+
if !ok {
124+
close(doneCh)
125+
return
126+
}
127+
}
128+
}()
129+
130+
// Report on circuit breaker state changes.
131+
cbStateCh := r.cb.Subscribe()
132+
go func() {
133+
for {
134+
select {
135+
case e, ok := <-cbStateCh:
136+
if !ok {
137+
return
138+
}
139+
var state string
140+
switch e {
141+
case circuit.BreakerTripped:
142+
state = "too many connection errors, log delivery is stopped until this improves"
143+
case circuit.BreakerReset:
144+
state = "resuming log delivery"
145+
default:
146+
continue
147+
}
148+
fmt.Println(time.Now().Format(time.RFC3339), "syslog target", opts.SyslogAddr, "("+opts.SyslogTag+"):", state)
149+
case <-doneCh:
150+
return
151+
}
152+
}
153+
}()
154+
155+
return r, nil
88156
}
89157

90158
func (r RFC5424Hook) Close() error {
91-
r.sender.End()
159+
r.sender.End() // This will also close errCh returned by syslog.Dial() in NewRFC5424Hook(), causing related goroutines to exit.
92160
return nil
93161
}

0 commit comments

Comments
 (0)