diff --git a/.golangci.yml b/.golangci.yml index adbf999..4405039 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -6,7 +6,6 @@ linters: enable: - revive - bodyclose - - exportloopref - gocognit - goconst - gofmt @@ -22,7 +21,6 @@ issues: - path: _test\.go linters: - errcheck - - exportloopref - gocognit - goconst - gosec diff --git a/cmd/bm/fluent_forward_go/bm_test.go b/cmd/bm/fluent_forward_go/bm_test.go index eaedb81..3a2fea9 100644 --- a/cmd/bm/fluent_forward_go/bm_test.go +++ b/cmd/bm/fluent_forward_go/bm_test.go @@ -15,7 +15,7 @@ func Benchmark_Fluent_Forward_Go_SendOnly(b *testing.B) { tagVar := "bar" c := client.New(client.ConnectionOptions{ - ConnectionTimeout: 3 * time.Second, + DialTimeout: 3 * time.Second, }) err := c.Connect() @@ -43,7 +43,7 @@ func Benchmark_Fluent_Forward_Go_SingleMessage(b *testing.B) { tagVar := "bar" c := client.New(client.ConnectionOptions{ - ConnectionTimeout: 3 * time.Second, + DialTimeout: 3 * time.Second, }) err := c.Connect() @@ -71,8 +71,8 @@ func Benchmark_Fluent_Forward_Go_SingleMessageAck(b *testing.B) { tagVar := "foo" c := client.New(client.ConnectionOptions{ - RequireAck: true, - ConnectionTimeout: 3 * time.Second, + RequireAck: true, + DialTimeout: 3 * time.Second, }) err := c.Connect() @@ -100,7 +100,7 @@ func Benchmark_Fluent_Forward_Go_Bytes(b *testing.B) { tagVar := "foo" c := client.New(client.ConnectionOptions{ - ConnectionTimeout: 3 * time.Second, + DialTimeout: 3 * time.Second, }) err := c.Connect() @@ -130,8 +130,8 @@ func Benchmark_Fluent_Forward_Go_BytesAck(b *testing.B) { tagVar := "foo" c := client.New(client.ConnectionOptions{ - RequireAck: true, - ConnectionTimeout: 3 * time.Second, + RequireAck: true, + DialTimeout: 3 * time.Second, }) err := c.Connect() @@ -162,7 +162,7 @@ func Benchmark_Fluent_Forward_Go_RawMessage(b *testing.B) { tagVar := "foo" c := client.New(client.ConnectionOptions{ - ConnectionTimeout: 3 * time.Second, + DialTimeout: 3 * time.Second, }) err := c.Connect() @@ -193,8 +193,8 @@ func Benchmark_Fluent_Forward_Go_RawMessageAck(b *testing.B) { tagVar := "foo" c := client.New(client.ConnectionOptions{ - RequireAck: true, - ConnectionTimeout: 3 * time.Second, + RequireAck: true, + DialTimeout: 3 * time.Second, }) err := c.Connect() @@ -226,7 +226,7 @@ func Benchmark_Fluent_Forward_Go_CompressedMessage(b *testing.B) { tagVar := "foo" c := client.New(client.ConnectionOptions{ - ConnectionTimeout: 3 * time.Second, + DialTimeout: 3 * time.Second, }) err := c.Connect() @@ -280,8 +280,8 @@ func Benchmark_Fluent_Forward_Go_CompressedMessageAck(b *testing.B) { tagVar := "foo" c := client.New(client.ConnectionOptions{ - RequireAck: true, - ConnectionTimeout: 3 * time.Second, + RequireAck: true, + DialTimeout: 3 * time.Second, }) err := c.Connect() diff --git a/fluent/client/client.go b/fluent/client/client.go index b32db7c..565010b 100644 --- a/fluent/client/client.go +++ b/fluent/client/client.go @@ -41,7 +41,9 @@ import ( //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate const ( - DefaultConnectionTimeout time.Duration = 60 * time.Second + DefaultDialTimeout time.Duration = 10 * time.Second + DefaultReadTimeout time.Duration = 30 * time.Second + DefaultWriteTimeout time.Duration = 30 * time.Second ) // MessageClient implementations send MessagePack messages to a peer @@ -71,23 +73,24 @@ type ConnectionFactory interface { type Client struct { ConnectionFactory - RequireAck bool - Timeout time.Duration - AuthInfo AuthInfo - Hostname string - session *Session - ackLock sync.Mutex - sessionLock sync.RWMutex + RequireAck bool + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + AuthInfo AuthInfo + Hostname string + session *Session + ackLock sync.Mutex + sessionLock sync.RWMutex } type ConnectionOptions struct { - Factory ConnectionFactory - RequireAck bool - ConnectionTimeout time.Duration - // TODO: - // ReadTimeout time.Duration - // WriteTimeout time.Duration - AuthInfo AuthInfo + Factory ConnectionFactory + RequireAck bool + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + AuthInfo AuthInfo } type AuthInfo struct { @@ -110,15 +113,26 @@ func New(opts ConnectionOptions) *Client { } } - if opts.ConnectionTimeout == 0 { - opts.ConnectionTimeout = DefaultConnectionTimeout + // Set default timeouts if not provided + if opts.DialTimeout == 0 { + opts.DialTimeout = DefaultDialTimeout + } + + if opts.ReadTimeout == 0 { + opts.ReadTimeout = DefaultReadTimeout + } + + if opts.WriteTimeout == 0 { + opts.WriteTimeout = DefaultWriteTimeout } return &Client{ ConnectionFactory: factory, AuthInfo: opts.AuthInfo, RequireAck: opts.RequireAck, - Timeout: opts.ConnectionTimeout, + DialTimeout: opts.DialTimeout, + ReadTimeout: opts.ReadTimeout, + WriteTimeout: opts.WriteTimeout, } } @@ -164,6 +178,13 @@ func (c *Client) Handshake() error { var helo protocol.Helo + // apply read timeout for reading helo message + if c.ReadTimeout != 0 { + if err := c.session.Connection.SetReadDeadline(time.Now().Add(c.ReadTimeout)); err != nil { + return err + } + } + r := msgp.NewReader(c.session.Connection) err := helo.DecodeMsg(r) @@ -183,6 +204,13 @@ func (c *Client) Handshake() error { return err } + // apply write timeout for sending the ping message + if c.WriteTimeout != 0 { + if err := c.session.Connection.SetWriteDeadline(time.Now().Add(c.WriteTimeout)); err != nil { + return err + } + } + err = msgp.Encode(c.session.Connection, ping) if err != nil { return err @@ -190,6 +218,13 @@ func (c *Client) Handshake() error { var pong protocol.Pong + // apply read timeout for receiving the pong message + if c.ReadTimeout != 0 { + if err := c.session.Connection.SetReadDeadline(time.Now().Add(c.ReadTimeout)); err != nil { + return err + } + } + err = pong.DecodeMsg(r) if err != nil { return err @@ -246,8 +281,8 @@ func (c *Client) Reconnect() error { } func (c *Client) checkAck(chunk string) error { - if c.Timeout != 0 { - if err := c.session.Connection.SetReadDeadline(time.Now().Add(c.Timeout)); err != nil { + if c.ReadTimeout != 0 { + if err := c.session.Connection.SetReadDeadline(time.Now().Add(c.ReadTimeout)); err != nil { return err } } @@ -292,6 +327,13 @@ func (c *Client) Send(e protocol.ChunkEncoder) error { defer c.ackLock.Unlock() } + // apply write timeout for sending the message + if c.WriteTimeout != 0 { + if err := c.session.Connection.SetWriteDeadline(time.Now().Add(c.WriteTimeout)); err != nil { + return err + } + } + err = msgp.Encode(c.session.Connection, e) if err != nil || !c.RequireAck { return err @@ -315,6 +357,13 @@ func (c *Client) SendRaw(m []byte) error { return errors.New("session handshake not completed") } + // apply write timeout for write + if c.WriteTimeout != 0 { + if err := c.session.Connection.SetWriteDeadline(time.Now().Add(c.WriteTimeout)); err != nil { + return err + } + } + _, err := c.session.Connection.Write(m) return err diff --git a/fluent/client/client_test.go b/fluent/client/client_test.go index d68c2ba..abdd3c3 100644 --- a/fluent/client/client_test.go +++ b/fluent/client/client_test.go @@ -50,8 +50,8 @@ var _ = Describe("Client", func() { factory = &clientfakes.FakeConnectionFactory{} opts := ConnectionOptions{ - Factory: factory, - ConnectionTimeout: 2 * time.Second, + Factory: factory, + DialTimeout: 2 * time.Second, } client = New(opts) @@ -184,6 +184,77 @@ var _ = Describe("Client", func() { // TODO: We need a test that no message is sent }) + Context("When WriteDeadline is set and write takes too long", func() { + BeforeEach(func() { + opts := ConnectionOptions{ + Factory: factory, + DialTimeout: 2 * time.Second, + WriteTimeout: 100 * time.Millisecond, // short timeout + RequireAck: false, + } + client = New(opts) + clientSide, serverSide = net.Pipe() + factory.NewReturns(clientSide, nil) + }) + + It("returns a write timeout error", func() { + go func() { + defer GinkgoRecover() + time.Sleep(200 * time.Millisecond) // this is longer than WriteTimeout + buf := make([]byte, 1024) + _, err := serverSide.Read(buf) + Expect(err).NotTo(HaveOccurred()) + }() + + err := client.Send(&msg) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("write pipe: i/o timeout")) + }) + }) + + Context("When ReadDeadline is set and acknowledgment is delayed", func() { + BeforeEach(func() { + opts := ConnectionOptions{ + Factory: factory, + DialTimeout: 2 * time.Second, + ReadTimeout: 100 * time.Millisecond, // short timeout + RequireAck: true, // waig for acknowledgment + } + client = New(opts) + clientSide, serverSide = net.Pipe() + factory.NewReturns(clientSide, nil) + }) + + It("returns a read timeout error", func() { + done := make(chan bool) // used to synchronize test + go func() { + defer GinkgoRecover() + defer func() { + done <- true + }() + err := client.Send(&msg) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("read pipe: i/o timeout")) + }() + + // Server reads the message + rcvd := &protocol.MessageExt{} + err := rcvd.DecodeMsg(msgp.NewReader(serverSide)) + Expect(err).NotTo(HaveOccurred()) + + // delay sending the ack (longer than ReadTimeout) + time.Sleep(200 * time.Millisecond) + + // send the ack + chunk := rcvd.Options.Chunk + ack := &protocol.AckMessage{Ack: chunk} + err = ack.EncodeMsg(msgp.NewWriter(serverSide)) + Expect(err).NotTo(HaveOccurred()) + + <-done + }) + }) + Context("RequireAck is true", func() { var ( serverSide net.Conn @@ -215,11 +286,11 @@ var _ = Describe("Client", func() { Expect(err).ToNot(HaveOccurred()) }() - rcvd := &protocol.MessageExt{} - err := rcvd.DecodeMsg(serverReader) + msgext := &protocol.MessageExt{} + err := msgext.DecodeMsg(serverReader) Expect(err).ToNot(HaveOccurred()) - chunk := rcvd.Options.Chunk + chunk := msgext.Options.Chunk Expect(chunk).ToNot(BeEmpty()) Expect(chunk).To(Equal(msg.Options.Chunk)) @@ -241,12 +312,12 @@ var _ = Describe("Client", func() { Expect(err.Error()).To(ContainSubstring("Expected chunk")) }() - rcvd := &protocol.MessageExt{} + msgext := &protocol.MessageExt{} serverSide.SetReadDeadline(time.Now().Add(time.Second)) - err := rcvd.DecodeMsg(serverReader) + err := msgext.DecodeMsg(serverReader) Expect(err).ToNot(HaveOccurred()) - chunk := rcvd.Options.Chunk + chunk := msgext.Options.Chunk Expect(chunk).ToNot(BeEmpty()) Expect(chunk).To(Equal(msg.Options.Chunk)) diff --git a/fluent/protocol/forward_message.go b/fluent/protocol/forward_message.go index 94bed3a..3a2231c 100644 --- a/fluent/protocol/forward_message.go +++ b/fluent/protocol/forward_message.go @@ -24,7 +24,9 @@ SOFTWARE. package protocol -import "github.com/tinylib/msgp/msgp" +import ( + "github.com/tinylib/msgp/msgp" +) //go:generate msgp @@ -76,7 +78,7 @@ func (fm *ForwardMessage) EncodeMsg(dc *msgp.Writer) error { size = 3 } - err := dc.WriteArrayHeader(uint32(size)) + err := dc.WriteArrayHeader(uint32(size)) //#nosec:G115 if err != nil { return msgp.WrapError(err, "Array Header") } diff --git a/fluent/protocol/transport.go b/fluent/protocol/transport.go index 306872f..7231c49 100644 --- a/fluent/protocol/transport.go +++ b/fluent/protocol/transport.go @@ -105,8 +105,8 @@ func (et *EventTime) MarshalBinaryTo(b []byte) error { // b[0] = 0xD7 // b[1] = 0x00 - binary.BigEndian.PutUint32(b, uint32(utc.Unix())) - binary.BigEndian.PutUint32(b[4:], uint32(utc.Nanosecond())) + binary.BigEndian.PutUint32(b, uint32(utc.Unix())) //#nosec:G115 + binary.BigEndian.PutUint32(b[4:], uint32(utc.Nanosecond())) //#nosec:G115 return nil }