Skip to content

Commit ca49b86

Browse files
authored
Fix high memory cpu (#34)
- close all the logger and streams eventually when error happens to prevent the program stuck in memory/cpu loop - retry for errors and make retry number configurable through env variables except for non-retriable errors: EOF and "file already closed"
1 parent 25228f4 commit ca49b86

File tree

4 files changed

+35
-9
lines changed

4 files changed

+35
-9
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
PLUGIN_NAME=splunk/docker-logging-plugin
2-
PLUGIN_TAG=2.0.0-rc1
2+
PLUGIN_TAG=2.0.0-rc2
33
PLUGIN_DIR=./splunk-logging-plugin
44

55
all: clean docker rootfs create

driver.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ type logPair struct {
5151
info logger.Info
5252
}
5353

54+
func (lf *logPair) Close() {
55+
lf.stream.Close()
56+
lf.splunkl.Close()
57+
lf.jsonl.Close()
58+
}
59+
5460
func newDriver() *driver {
5561
return &driver{
5662
logs: make(map[string]*logPair),
@@ -108,7 +114,9 @@ func (d *driver) StartLogging(file string, logCtx logger.Info) error {
108114

109115
// start to process the logs generated by docker
110116
logrus.Debug("Start processing messages")
111-
mg := &messageProcessor{}
117+
mg := &messageProcessor{
118+
retryNumber: getAdvancedOptionInt(envVarReadFifoErrorRetryNumber, defaultReadFifoErrorRetryNumber),
119+
}
112120
go mg.process(lf)
113121
return nil
114122
}
@@ -118,7 +126,7 @@ func (d *driver) StopLogging(file string) error {
118126
d.mu.Lock()
119127
lf, ok := d.logs[file]
120128
if ok {
121-
lf.stream.Close()
129+
lf.Close()
122130
delete(d.logs, file)
123131
}
124132
d.mu.Unlock()

message_processor.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"bytes"
2121
"encoding/binary"
2222
"io"
23+
"os"
24+
"strings"
2325
"time"
2426
"unicode/utf8"
2527

@@ -30,6 +32,7 @@ import (
3032
)
3133

3234
type messageProcessor struct {
35+
retryNumber int
3336
}
3437

3538
func (mg messageProcessor) process(lf *logPair) {
@@ -49,21 +52,32 @@ func (mg messageProcessor) consumeLog(lf *logPair) {
4952
// create a protobuf reader for the log stream
5053
dec := protoio.NewUint32DelimitedReader(lf.stream, binary.BigEndian, 1e6)
5154
defer dec.Close()
55+
defer lf.Close()
5256
// a temp buffer for each log entry
5357
var buf logdriver.LogEntry
58+
curRetryNumber := 0
5459
for {
55-
// reads a message from the log stream and put it in a buffer until the EOF
56-
// if there is any other error, recreate the stream reader
60+
// reads a message from the log stream and put it in a buffer
5761
if err := dec.ReadMsg(&buf); err != nil {
58-
if err == io.EOF {
59-
logrus.WithField("id", lf.info.ContainerID).WithError(err).Debug("shutting down log logger")
60-
lf.stream.Close()
62+
// exit the loop if reader reaches EOF or the fifo is closed by the writer
63+
if err == io.EOF || err == os.ErrClosed || strings.Contains(err.Error(), "file already closed") {
64+
logrus.WithField("id", lf.info.ContainerID).WithError(err).Info("shutting down loggers")
6165
return
6266
}
6367

64-
logrus.WithField("id", lf.info.ContainerID).WithError(err).Debug("Ignoring error")
68+
// exit the loop if retry number reaches the specified number
69+
if mg.retryNumber != -1 && curRetryNumber > mg.retryNumber {
70+
logrus.WithField("id", lf.info.ContainerID).WithField("curRetryNumber", curRetryNumber).WithField("retryNumber", mg.retryNumber).WithError(err).Error("Stop retrying. Shutting down loggers")
71+
return
72+
}
73+
74+
// if there is any other error, retry for robustness. If retryNumber is -1, retry forever
75+
curRetryNumber++
76+
logrus.WithField("id", lf.info.ContainerID).WithField("curRetryNumber", curRetryNumber).WithField("retryNumber", mg.retryNumber).WithError(err).Error("Encountered error and retrying")
77+
time.Sleep(500 * time.Millisecond)
6578
dec = protoio.NewUint32DelimitedReader(lf.stream, binary.BigEndian, 1e6)
6679
}
80+
curRetryNumber = 0
6781

6882
if mg.shouldSendMessage(buf.Line) {
6983
// Append to temp buffer

splunk_logger.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ const (
7272
defaultPartialMsgBufferHoldDuration = 100 * time.Millisecond
7373
// Maximum buffer size for partial logging
7474
defaultPartialMsgBufferMaximum = 1024 * 1024
75+
// Number of retry if error happens while reading logs from docker provided fifo
76+
// -1 means retry forever
77+
defaultReadFifoErrorRetryNumber = 3
7578
)
7679

7780
const (
@@ -81,6 +84,7 @@ const (
8184
envVarStreamChannelSize = "SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE"
8285
envVarPartialMsgBufferHoldDuration = "SPLUNK_LOGGING_DRIVER_TEMP_MESSAGES_HOLD_DURATION"
8386
envVarPartialMsgBufferMaximum = "SPLUNK_LOGGING_DRIVER_TEMP_MESSAGES_BUFFER_SIZE"
87+
envVarReadFifoErrorRetryNumber = "SPLUNK_LOGGING_DRIVER_FIFO_ERROR_RETRY_TIME"
8488
)
8589

8690
type splunkLoggerInterface interface {

0 commit comments

Comments
 (0)