Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions internal/command/command_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,16 @@ func (cs *CommandService) UpdateDataPlaneStatus(
cs.subscribeClientMutex.Unlock()
return nil, errors.New("command service client is not initialized")
}
response, updateError := cs.commandServiceClient.UpdateDataPlaneStatus(ctx, request)

grpcCtx, cancel := context.WithTimeout(ctx, cs.agentConfig.Client.Grpc.ResponseTimeout)
defer cancel()

response, updateError := cs.commandServiceClient.UpdateDataPlaneStatus(grpcCtx, request)
cs.subscribeClientMutex.Unlock()

validatedError := grpc.ValidateGrpcError(updateError)
if validatedError != nil {
slog.ErrorContext(ctx, "Failed to send update data plane status", "error", validatedError)
slog.ErrorContext(grpcCtx, "Failed to send update data plane status", "error", validatedError)

return nil, validatedError
}
Expand Down Expand Up @@ -384,13 +388,16 @@ func (cs *CommandService) dataPlaneHealthCallback(
return nil, errors.New("command service client is not initialized")
}

response, updateError := cs.commandServiceClient.UpdateDataPlaneHealth(ctx, request)
grpcCtx, cancel := context.WithTimeout(ctx, cs.agentConfig.Client.Grpc.ResponseTimeout)
defer cancel()

response, updateError := cs.commandServiceClient.UpdateDataPlaneHealth(grpcCtx, request)
cs.subscribeClientMutex.Unlock()

validatedError := grpc.ValidateGrpcError(updateError)

if validatedError != nil {
slog.ErrorContext(ctx, "Failed to send update data plane health", "error", validatedError)
slog.ErrorContext(grpcCtx, "Failed to send update data plane health", "error", validatedError)

return nil, validatedError
}
Expand Down Expand Up @@ -558,13 +565,16 @@ func (cs *CommandService) connectCallback(
request *mpi.CreateConnectionRequest,
) func() (*mpi.CreateConnectionResponse, error) {
return func() (*mpi.CreateConnectionResponse, error) {
grpcCtx, cancel := context.WithTimeout(ctx, cs.agentConfig.Client.Grpc.ResponseTimeout)
defer cancel()

cs.subscribeClientMutex.Lock()
response, connectErr := cs.commandServiceClient.CreateConnection(ctx, request)
response, connectErr := cs.commandServiceClient.CreateConnection(grpcCtx, request)
cs.subscribeClientMutex.Unlock()

validatedError := grpc.ValidateGrpcError(connectErr)
if validatedError != nil {
slog.ErrorContext(ctx, "Failed to create connection", "error", validatedError)
slog.ErrorContext(grpcCtx, "Failed to create connection", "error", validatedError)

return nil, validatedError
}
Expand Down
7 changes: 7 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,12 @@ func registerClientFlags(fs *flag.FlagSet) {
"Max file size in bytes.",
)

fs.Duration(
ClientGRPCResponseTimeoutKey,
DefResponseTimeout,
"Duration to wait for a response before retrying request",
)

fs.Int(
ClientGRPCMaxParallelFileOperationsKey,
DefMaxParallelFileOperations,
Expand Down Expand Up @@ -1111,6 +1117,7 @@ func resolveClient() *Client {
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
ResponseTimeout: viperInstance.GetDuration(ClientGRPCResponseTimeoutKey),
MaxParallelFileOperations: viperInstance.GetInt(ClientGRPCMaxParallelFileOperationsKey),
},
Backoff: &BackOff{
Expand Down
1 change: 1 addition & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,7 @@ func createConfig() *Config {
MaxFileSize: 485753,
FileChunkSize: 48575,
MaxParallelFileOperations: 10,
ResponseTimeout: 30 * time.Second,
},
Backoff: &BackOff{
InitialInterval: 200 * time.Millisecond,
Expand Down
1 change: 1 addition & 0 deletions internal/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const (
DefMaxFileSize uint32 = 1048576 // 1MB
DefFileChunkSize uint32 = 524288 // 0.5MB
DefMaxParallelFileOperations = 5
DefResponseTimeout = 10 * time.Second

// Client HTTP Settings
DefHTTPTimeout = 10 * time.Second
Expand Down
1 change: 1 addition & 0 deletions internal/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
ClientGRPCMaxParallelFileOperationsKey = pre(ClientRootKey) + "grpc_max_parallel_file_operations"
ClientGRPCResponseTimeoutKey = pre(ClientRootKey) + "grpc_response_timeout"

ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval"
ClientBackoffMaxIntervalKey = pre(ClientRootKey) + "backoff_max_interval"
Expand Down
1 change: 1 addition & 0 deletions internal/config/testdata/nginx-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ client:
max_message_receive_size: 1048575
max_message_send_size: 1048575
max_file_size: 485753
response_timeout: 30s
file_chunk_size: 48575
max_parallel_file_operations: 10
backoff:
Expand Down
3 changes: 2 additions & 1 deletion internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ type (

//nolint:lll // max line limit exceeded
GRPC struct {
KeepAlive *KeepAlive `yaml:"keepalive" mapstructure:"keepalive"`
KeepAlive *KeepAlive `yaml:"keepalive" mapstructure:"keepalive"`
ResponseTimeout time.Duration `yaml:"response_timeout" mapstructure:"response_timeout"`
// if MaxMessageSize is size set then we use that value,
// otherwise MaxMessageRecieveSize and MaxMessageSendSize for individual settings
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`
Expand Down
7 changes: 5 additions & 2 deletions internal/file/file_service_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,15 @@ func (fso *FileServiceOperator) UpdateOverview(
"request", request, "parent_correlation_id", correlationID,
)

response, updateError := fso.fileServiceClient.UpdateOverview(newCtx, request)
grpcCtx, cancel := context.WithTimeout(ctx, fso.agentConfig.Client.Grpc.ResponseTimeout)
defer cancel()

response, updateError := fso.fileServiceClient.UpdateOverview(grpcCtx, request)

validatedError := internalgrpc.ValidateGrpcError(updateError)

if validatedError != nil {
slog.ErrorContext(newCtx, "Failed to send update overview", "error", validatedError)
slog.ErrorContext(grpcCtx, "Failed to send update overview", "error", validatedError)

return nil, validatedError
}
Expand Down
1 change: 1 addition & 0 deletions nginx-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ allowed_directories:
- /usr/share/nginx/modules
- /var/run/nginx
- /var/log/nginx

#
# Command server settings to connect to a management plane server
#
Expand Down
5 changes: 4 additions & 1 deletion test/config/agent/nginx-agent-with-auxiliary-command.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ auxiliary_command:
port: 9095
type: grpc


client:
grpc:
response_timeout: 2s

allowed_directories:
- /etc/nginx
- /usr/local/etc/nginx
Expand Down
4 changes: 4 additions & 0 deletions test/config/agent/nginx-config-with-grpc-client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ command:
port: 9092
type: grpc

client:
grpc:
response_timeout: 2s

allowed_directories:
- /etc/nginx
- /usr/local/etc/nginx
Expand Down
3 changes: 2 additions & 1 deletion test/config/agent/nginx-config-with-max-file-size.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ command:


client:
grpc:
grpc:
response_timeout: 2s
max_file_size: 524288
file_chunk_size: 262144

Expand Down
2 changes: 1 addition & 1 deletion test/integration/managementplane/config_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *MPITestSuite) TearDownTest() {
func (s *MPITestSuite) SetupSuite() {
slog.Info("starting MPI tests")
s.ctx = context.Background()
s.teardownTest = utils.SetupConnectionTest(s.T(), true, false, false,
s.teardownTest = utils.SetupConnectionTest(s.T(), false, false, false,
"../../config/agent/nginx-config-with-grpc-client.conf")
s.nginxInstanceID = utils.VerifyConnection(s.T(), 2, utils.MockManagementPlaneAPIAddress)
responses := utils.ManagementPlaneResponses(s.T(), 1, utils.MockManagementPlaneAPIAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
// Verify that the agent sends a connection request to Management Plane even when Nginx is not present
func TestNginxLessGrpc_Connection(t *testing.T) {
slog.Info("starting nginxless connection test")
teardownTest := utils.SetupConnectionTest(t, true, true, false,
teardownTest := utils.SetupConnectionTest(t, false, true, false,
"../../config/agent/nginx-config-with-grpc-client.conf")
defer teardownTest(t)

Expand Down
30 changes: 23 additions & 7 deletions test/mock/grpc/mock_management_command_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,21 @@ import (

type CommandService struct {
mpi.UnimplementedCommandServiceServer
instanceFiles map[string][]*mpi.File
firstConnectionCallCh chan struct{}
server *gin.Engine
connectionRequest *mpi.CreateConnectionRequest
requestChan chan *mpi.ManagementPlaneRequest
updateDataPlaneStatusRequest *mpi.UpdateDataPlaneStatusRequest
updateDataPlaneHealthRequest *mpi.UpdateDataPlaneHealthRequest
instanceFiles map[string][]*mpi.File // key is instanceID
configDirectory string
externalFileServer string
configDirectory string
dataPlaneResponses []*mpi.DataPlaneResponse
updateDataPlaneHealthMutex sync.Mutex
connectionMutex sync.Mutex
updateDataPlaneStatusMutex sync.Mutex
dataPlaneResponsesMutex sync.Mutex
updateDataPlaneStatusMutex sync.Mutex
connectionMutex sync.Mutex
updateDataPlaneHealthMutex sync.Mutex
firstConnectionCallFlag bool
}

func init() {
Expand All @@ -66,6 +68,8 @@ func NewCommandService(
configDirectory: configDirectory,
externalFileServer: externalFileServer,
instanceFiles: make(map[string][]*mpi.File),
firstConnectionCallCh: make(chan struct{}),
firstConnectionCallFlag: false,
}

handler := slog.NewTextHandler(
Expand Down Expand Up @@ -109,13 +113,25 @@ func (cs *CommandService) CreateConnection(
) {
slog.DebugContext(ctx, "Create connection request", "request", request)

// This checks if this is the first create connection call, this is done to test the logic in Agent where
// if Agent does not get a response to a request after a certain amount of time it will resend the request
if !cs.firstConnectionCallFlag {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here explaining why we are doing this?

cs.firstConnectionCallFlag = true
slog.DebugContext(ctx, "First CreateConnection call: blocking until second call")
<-cs.firstConnectionCallCh
} else {
slog.DebugContext(ctx, "Second CreateConnection call: unblocking first call")
close(cs.firstConnectionCallCh)
}

cs.connectionMutex.Lock()
defer cs.connectionMutex.Unlock()

if request == nil {
return nil, errors.New("empty connection request")
}

cs.connectionMutex.Lock()
cs.connectionRequest = request
cs.connectionMutex.Unlock()

return &mpi.CreateConnectionResponse{
Response: &mpi.CommandResponse{
Expand Down
Loading