Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ linters:
enable:
- revive
- bodyclose
- exportloopref
- gocognit
- goconst
- gofmt
Expand All @@ -22,7 +21,6 @@ issues:
- path: _test\.go
linters:
- errcheck
- exportloopref
- gocognit
- goconst
- gosec
Expand Down
26 changes: 13 additions & 13 deletions cmd/bm/fluent_forward_go/bm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func Benchmark_Fluent_Forward_Go_SendOnly(b *testing.B) {
tagVar := "bar"

c := client.New(client.ConnectionOptions{
ConnectionTimeout: 3 * time.Second,
DialTimeout: 3 * time.Second,
})

err := c.Connect()
Expand Down Expand Up @@ -43,7 +43,7 @@ func Benchmark_Fluent_Forward_Go_SingleMessage(b *testing.B) {
tagVar := "bar"

c := client.New(client.ConnectionOptions{
ConnectionTimeout: 3 * time.Second,
DialTimeout: 3 * time.Second,
})

err := c.Connect()
Expand Down Expand Up @@ -71,8 +71,8 @@ func Benchmark_Fluent_Forward_Go_SingleMessageAck(b *testing.B) {
tagVar := "foo"

c := client.New(client.ConnectionOptions{
RequireAck: true,
ConnectionTimeout: 3 * time.Second,
RequireAck: true,
DialTimeout: 3 * time.Second,
})

err := c.Connect()
Expand Down Expand Up @@ -100,7 +100,7 @@ func Benchmark_Fluent_Forward_Go_Bytes(b *testing.B) {
tagVar := "foo"

c := client.New(client.ConnectionOptions{
ConnectionTimeout: 3 * time.Second,
DialTimeout: 3 * time.Second,
})

err := c.Connect()
Expand Down Expand Up @@ -130,8 +130,8 @@ func Benchmark_Fluent_Forward_Go_BytesAck(b *testing.B) {
tagVar := "foo"

c := client.New(client.ConnectionOptions{
RequireAck: true,
ConnectionTimeout: 3 * time.Second,
RequireAck: true,
DialTimeout: 3 * time.Second,
})

err := c.Connect()
Expand Down Expand Up @@ -162,7 +162,7 @@ func Benchmark_Fluent_Forward_Go_RawMessage(b *testing.B) {
tagVar := "foo"

c := client.New(client.ConnectionOptions{
ConnectionTimeout: 3 * time.Second,
DialTimeout: 3 * time.Second,
})

err := c.Connect()
Expand Down Expand Up @@ -193,8 +193,8 @@ func Benchmark_Fluent_Forward_Go_RawMessageAck(b *testing.B) {
tagVar := "foo"

c := client.New(client.ConnectionOptions{
RequireAck: true,
ConnectionTimeout: 3 * time.Second,
RequireAck: true,
DialTimeout: 3 * time.Second,
})

err := c.Connect()
Expand Down Expand Up @@ -226,7 +226,7 @@ func Benchmark_Fluent_Forward_Go_CompressedMessage(b *testing.B) {
tagVar := "foo"

c := client.New(client.ConnectionOptions{
ConnectionTimeout: 3 * time.Second,
DialTimeout: 3 * time.Second,
})

err := c.Connect()
Expand Down Expand Up @@ -280,8 +280,8 @@ func Benchmark_Fluent_Forward_Go_CompressedMessageAck(b *testing.B) {
tagVar := "foo"

c := client.New(client.ConnectionOptions{
RequireAck: true,
ConnectionTimeout: 3 * time.Second,
RequireAck: true,
DialTimeout: 3 * time.Second,
})

err := c.Connect()
Expand Down
89 changes: 69 additions & 20 deletions fluent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import (
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate

const (
DefaultConnectionTimeout time.Duration = 60 * time.Second
DefaultDialTimeout time.Duration = 10 * time.Second
DefaultReadTimeout time.Duration = 30 * time.Second
DefaultWriteTimeout time.Duration = 30 * time.Second
)

// MessageClient implementations send MessagePack messages to a peer
Expand Down Expand Up @@ -71,23 +73,24 @@ type ConnectionFactory interface {

type Client struct {
ConnectionFactory
RequireAck bool
Timeout time.Duration
AuthInfo AuthInfo
Hostname string
session *Session
ackLock sync.Mutex
sessionLock sync.RWMutex
RequireAck bool
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
AuthInfo AuthInfo
Hostname string
session *Session
ackLock sync.Mutex
sessionLock sync.RWMutex
}

type ConnectionOptions struct {
Factory ConnectionFactory
RequireAck bool
ConnectionTimeout time.Duration
// TODO:
// ReadTimeout time.Duration
// WriteTimeout time.Duration
AuthInfo AuthInfo
Factory ConnectionFactory
RequireAck bool
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
AuthInfo AuthInfo
}

type AuthInfo struct {
Expand All @@ -110,15 +113,26 @@ func New(opts ConnectionOptions) *Client {
}
}

if opts.ConnectionTimeout == 0 {
opts.ConnectionTimeout = DefaultConnectionTimeout
// Set default timeouts if not provided
if opts.DialTimeout == 0 {
opts.DialTimeout = DefaultDialTimeout
}

if opts.ReadTimeout == 0 {
opts.ReadTimeout = DefaultReadTimeout
}

if opts.WriteTimeout == 0 {
opts.WriteTimeout = DefaultWriteTimeout
}

return &Client{
ConnectionFactory: factory,
AuthInfo: opts.AuthInfo,
RequireAck: opts.RequireAck,
Timeout: opts.ConnectionTimeout,
DialTimeout: opts.DialTimeout,
ReadTimeout: opts.ReadTimeout,
WriteTimeout: opts.WriteTimeout,
}
}

Expand Down Expand Up @@ -164,6 +178,13 @@ func (c *Client) Handshake() error {

var helo protocol.Helo

// apply read timeout for reading helo message
if c.ReadTimeout != 0 {
if err := c.session.Connection.SetReadDeadline(time.Now().Add(c.ReadTimeout)); err != nil {
return err
}
}

r := msgp.NewReader(c.session.Connection)
err := helo.DecodeMsg(r)

Expand All @@ -183,13 +204,27 @@ func (c *Client) Handshake() error {
return err
}

// apply write timeout for sending the ping message
if c.WriteTimeout != 0 {
if err := c.session.Connection.SetWriteDeadline(time.Now().Add(c.WriteTimeout)); err != nil {
return err
}
}

err = msgp.Encode(c.session.Connection, ping)
if err != nil {
return err
}

var pong protocol.Pong

// apply read timeout for receiving the pong message
if c.ReadTimeout != 0 {
if err := c.session.Connection.SetReadDeadline(time.Now().Add(c.ReadTimeout)); err != nil {
return err
}
}

err = pong.DecodeMsg(r)
if err != nil {
return err
Expand Down Expand Up @@ -246,8 +281,8 @@ func (c *Client) Reconnect() error {
}

func (c *Client) checkAck(chunk string) error {
if c.Timeout != 0 {
if err := c.session.Connection.SetReadDeadline(time.Now().Add(c.Timeout)); err != nil {
if c.ReadTimeout != 0 {
if err := c.session.Connection.SetReadDeadline(time.Now().Add(c.ReadTimeout)); err != nil {
return err
}
}
Expand Down Expand Up @@ -292,6 +327,13 @@ func (c *Client) Send(e protocol.ChunkEncoder) error {
defer c.ackLock.Unlock()
}

// apply write timeout for sending the message
if c.WriteTimeout != 0 {
if err := c.session.Connection.SetWriteDeadline(time.Now().Add(c.WriteTimeout)); err != nil {
return err
}
}

err = msgp.Encode(c.session.Connection, e)
if err != nil || !c.RequireAck {
return err
Expand All @@ -315,6 +357,13 @@ func (c *Client) SendRaw(m []byte) error {
return errors.New("session handshake not completed")
}

// apply write timeout for write
if c.WriteTimeout != 0 {
if err := c.session.Connection.SetWriteDeadline(time.Now().Add(c.WriteTimeout)); err != nil {
return err
}
}

_, err := c.session.Connection.Write(m)

return err
Expand Down
87 changes: 79 additions & 8 deletions fluent/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ var _ = Describe("Client", func() {
factory = &clientfakes.FakeConnectionFactory{}

opts := ConnectionOptions{
Factory: factory,
ConnectionTimeout: 2 * time.Second,
Factory: factory,
DialTimeout: 2 * time.Second,
}

client = New(opts)
Expand Down Expand Up @@ -184,6 +184,77 @@ var _ = Describe("Client", func() {
// TODO: We need a test that no message is sent
})

Context("When WriteDeadline is set and write takes too long", func() {
BeforeEach(func() {
opts := ConnectionOptions{
Factory: factory,
DialTimeout: 2 * time.Second,
WriteTimeout: 100 * time.Millisecond, // short timeout
RequireAck: false,
}
client = New(opts)
clientSide, serverSide = net.Pipe()
factory.NewReturns(clientSide, nil)
})

It("returns a write timeout error", func() {
go func() {
defer GinkgoRecover()
time.Sleep(200 * time.Millisecond) // this is longer than WriteTimeout
buf := make([]byte, 1024)
_, err := serverSide.Read(buf)
Expect(err).NotTo(HaveOccurred())
}()

err := client.Send(&msg)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("write pipe: i/o timeout"))
})
})

Context("When ReadDeadline is set and acknowledgment is delayed", func() {
BeforeEach(func() {
opts := ConnectionOptions{
Factory: factory,
DialTimeout: 2 * time.Second,
ReadTimeout: 100 * time.Millisecond, // short timeout
RequireAck: true, // waig for acknowledgment
}
client = New(opts)
clientSide, serverSide = net.Pipe()
factory.NewReturns(clientSide, nil)
})

It("returns a read timeout error", func() {
done := make(chan bool) // used to synchronize test
go func() {
defer GinkgoRecover()
defer func() {
done <- true
}()
err := client.Send(&msg)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("read pipe: i/o timeout"))
}()

// Server reads the message
rcvd := &protocol.MessageExt{}
err := rcvd.DecodeMsg(msgp.NewReader(serverSide))
Expect(err).NotTo(HaveOccurred())

// delay sending the ack (longer than ReadTimeout)
time.Sleep(200 * time.Millisecond)

// send the ack
chunk := rcvd.Options.Chunk
ack := &protocol.AckMessage{Ack: chunk}
err = ack.EncodeMsg(msgp.NewWriter(serverSide))
Expect(err).NotTo(HaveOccurred())

<-done
})
})

Context("RequireAck is true", func() {
var (
serverSide net.Conn
Expand Down Expand Up @@ -215,11 +286,11 @@ var _ = Describe("Client", func() {
Expect(err).ToNot(HaveOccurred())
}()

rcvd := &protocol.MessageExt{}
err := rcvd.DecodeMsg(serverReader)
msgext := &protocol.MessageExt{}
err := msgext.DecodeMsg(serverReader)
Expect(err).ToNot(HaveOccurred())

chunk := rcvd.Options.Chunk
chunk := msgext.Options.Chunk
Expect(chunk).ToNot(BeEmpty())
Expect(chunk).To(Equal(msg.Options.Chunk))

Expand All @@ -241,12 +312,12 @@ var _ = Describe("Client", func() {
Expect(err.Error()).To(ContainSubstring("Expected chunk"))
}()

rcvd := &protocol.MessageExt{}
msgext := &protocol.MessageExt{}
serverSide.SetReadDeadline(time.Now().Add(time.Second))
err := rcvd.DecodeMsg(serverReader)
err := msgext.DecodeMsg(serverReader)
Expect(err).ToNot(HaveOccurred())

chunk := rcvd.Options.Chunk
chunk := msgext.Options.Chunk
Expect(chunk).ToNot(BeEmpty())
Expect(chunk).To(Equal(msg.Options.Chunk))

Expand Down
Loading