Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/agent/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ func (cs *ClientSet) sync() {
}

func (cs *ClientSet) connectOnce() error {
// Skip establishing new connections if draining
select {
case <-cs.drainCh:
klog.V(2).InfoS("Skipping connectOnce - agent is draining")
return nil
default:
}

serverCount := cs.determineServerCount()

// If not in syncForever mode, we only connect if we have fewer connections than the server count.
Expand Down
56 changes: 51 additions & 5 deletions pkg/server/backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ type Backend struct {
// cached from conn.Context()
id string
idents header.Identifiers

// draining indicates if this backend is draining and should not accept new connections
draining bool
// mu protects draining field
mu sync.RWMutex
}

// IsDraining returns true if the backend is draining
func (b *Backend) IsDraining() bool {
b.mu.RLock()
defer b.mu.RUnlock()
return b.draining
}

// SetDraining marks the backend as draining
func (b *Backend) SetDraining() {
b.mu.Lock()
defer b.mu.Unlock()
b.draining = true
}

func (b *Backend) Send(p *client.Packet) error {
Expand Down Expand Up @@ -346,9 +365,36 @@ func (s *DefaultBackendStorage) GetRandomBackend() (*Backend, error) {
if len(s.backends) == 0 {
return nil, &ErrNotFound{}
}
agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))]
klog.V(3).InfoS("Pick agent as backend", "agentID", agentID)
// always return the first connection to an agent, because the agent
// will close later connections if there are multiple.
return s.backends[agentID][0], nil

var firstDrainingBackend *Backend

// Start at a random agent and check each agent in sequence
startIdx := s.random.Intn(len(s.agentIDs))
for i := 0; i < len(s.agentIDs); i++ {
// Wrap around using modulo
currentIdx := (startIdx + i) % len(s.agentIDs)
agentID := s.agentIDs[currentIdx]
// always return the first connection to an agent, because the agent
// will close later connections if there are multiple.
backend := s.backends[agentID][0]

if !backend.IsDraining() {
klog.V(3).InfoS("Pick agent as backend", "agentID", agentID)
return backend, nil
}

// Keep track of first draining backend as fallback
if firstDrainingBackend == nil {
firstDrainingBackend = backend
}
}

// All agents are draining, use one as fallback
if firstDrainingBackend != nil {
agentID := firstDrainingBackend.id
klog.V(2).InfoS("No non-draining backends available, using draining backend as fallback", "agentID", agentID)
return firstDrainingBackend, nil
}

return nil, &ErrNotFound{}
}
21 changes: 19 additions & 2 deletions pkg/server/desthost_backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,25 @@ func (dibm *DestHostBackendManager) Backend(ctx context.Context) (*Backend, erro
if destHost != "" {
bes, exist := dibm.backends[destHost]
if exist && len(bes) > 0 {
klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost)
return dibm.backends[destHost][0], nil
var firstDrainingBackend *Backend

// Find a non-draining backend for this destination host
for _, backend := range bes {
if !backend.IsDraining() {
klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost)
return backend, nil
}
// Keep track of first draining backend as fallback
if firstDrainingBackend == nil {
firstDrainingBackend = backend
}
}

// All backends for this destination are draining, use one as fallback
if firstDrainingBackend != nil {
klog.V(4).InfoS("All backends for destination host are draining, using one as fallback", "destHost", destHost)
return firstDrainingBackend, nil
}
}
}
return nil, &ErrNotFound{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,8 @@ func (s *ProxyServer) serveRecvBackend(backend *Backend, agentID string, recvCh

case client.PacketType_DRAIN:
klog.V(2).InfoS("agent is draining", "agentID", agentID)
backend.SetDraining()
klog.V(2).InfoS("marked backend as draining, will not route new requests to this agent", "agentID", agentID)
default:
klog.V(5).InfoS("Ignoring unrecognized packet from backend", "packet", pkt, "agentID", agentID)
}
Expand Down