From 641dd2992240cb76c744e6a1880d80bb020a45c3 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Wed, 22 Oct 2025 15:28:12 -0400 Subject: [PATCH 01/13] initial drop of the activator code Co-authored-by: Braulio Dumba Signed-off-by: Lionel Villard --- Dockerfile.activator | 28 ++ Makefile | 29 +- charts/activator-filter/.helmignore | 23 ++ charts/activator-filter/Chart.yaml | 9 + charts/activator-filter/README.md | 31 ++ charts/activator-filter/templates/NOTES.txt | 1 + charts/activator-filter/templates/istio.yaml | 25 ++ charts/activator-filter/templates/rbac.yaml | 81 +++++ charts/activator-filter/values.yaml | 1 + charts/activator/.helmignore | 23 ++ charts/activator/Chart.yaml | 9 + charts/activator/README.md | 44 +++ charts/activator/templates/NOTES.txt | 1 + charts/activator/templates/activator.yaml | 54 ++++ charts/activator/templates/helpers.tpl | 3 + charts/activator/templates/istio.yaml | 44 +++ charts/activator/values.yaml | 19 ++ cmd/activator/main.go | 35 +++ cmd/activator/runner/health.go | 123 ++++++++ cmd/activator/runner/runner.go | 259 +++++++++++++++ .../controller/inferencepool_reconciler.go | 120 +++++++ pkg/activator/datastore/datastore.go | 114 +++++++ pkg/activator/datastore/datastore_test.go | 83 +++++ pkg/activator/handlers/server.go | 202 ++++++++++++ pkg/activator/requestcontrol/activator.go | 295 ++++++++++++++++++ pkg/activator/requestcontrol/deactivator.go | 108 +++++++ pkg/activator/requestcontrol/gvr.go | 39 +++ pkg/activator/requestcontrol/types.go | 31 ++ pkg/activator/runnable/grpc.go | 67 ++++ pkg/activator/runnable/leader_election.go | 31 ++ pkg/activator/server/controller_manager.go | 119 +++++++ pkg/activator/server/runserver.go | 156 +++++++++ pkg/activator/tls/tls.go | 74 +++++ 33 files changed, 2280 insertions(+), 1 deletion(-) create mode 100644 Dockerfile.activator create mode 100644 charts/activator-filter/.helmignore create mode 100644 charts/activator-filter/Chart.yaml create mode 100644 charts/activator-filter/README.md create mode 100644 charts/activator-filter/templates/NOTES.txt create mode 100644 charts/activator-filter/templates/istio.yaml create mode 100644 charts/activator-filter/templates/rbac.yaml create mode 100644 charts/activator-filter/values.yaml create mode 100644 charts/activator/.helmignore create mode 100644 charts/activator/Chart.yaml create mode 100644 charts/activator/README.md create mode 100644 charts/activator/templates/NOTES.txt create mode 100644 charts/activator/templates/activator.yaml create mode 100644 charts/activator/templates/helpers.tpl create mode 100644 charts/activator/templates/istio.yaml create mode 100644 charts/activator/values.yaml create mode 100644 cmd/activator/main.go create mode 100644 cmd/activator/runner/health.go create mode 100644 cmd/activator/runner/runner.go create mode 100644 pkg/activator/controller/inferencepool_reconciler.go create mode 100644 pkg/activator/datastore/datastore.go create mode 100644 pkg/activator/datastore/datastore_test.go create mode 100644 pkg/activator/handlers/server.go create mode 100644 pkg/activator/requestcontrol/activator.go create mode 100644 pkg/activator/requestcontrol/deactivator.go create mode 100644 pkg/activator/requestcontrol/gvr.go create mode 100644 pkg/activator/requestcontrol/types.go create mode 100644 pkg/activator/runnable/grpc.go create mode 100644 pkg/activator/runnable/leader_election.go create mode 100644 pkg/activator/server/controller_manager.go create mode 100644 pkg/activator/server/runserver.go create mode 100644 pkg/activator/tls/tls.go diff --git a/Dockerfile.activator b/Dockerfile.activator new file mode 100644 index 00000000..6c0e2ebd --- /dev/null +++ b/Dockerfile.activator @@ -0,0 +1,28 @@ +## Multistage build +FROM quay.io/projectquay/golang:1.25 AS builder +ARG COMMIT_SHA=unknown +ARG BUILD_REF +ARG TARGETOS=linux +ARG TARGETARCH=amd64 +ENV CGO_ENABLED=0 +ENV GOOS=$TARGETOS +ENV GOARCH=$TARGETARCH + +# Dependencies +WORKDIR /src +COPY go.mod go.sum ./ +RUN go mod download + +# Sources +COPY cmd/activator ./cmd/activator +COPY pkg/activator ./pkg/activator +WORKDIR /src/cmd/activator +RUN go build -ldflags="-X sigs.k8s.io/gateway-api-inference-extension/version.CommitSHA=${COMMIT_SHA} -X sigs.k8s.io/gateway-api-inference-extension/version.BuildRef=${BUILD_REF}" -o /activator + +## Multistage deploy +FROM registry.access.redhat.com/ubi9/ubi-minimal:latest + +WORKDIR / +COPY --from=builder /activator /activator + +ENTRYPOINT ["/activator"] diff --git a/Makefile b/Makefile index 2ec6df3c..0eae2396 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,9 @@ NAMESPACE ?= hc4ai-operator VLLM_SIMULATOR_TAG ?= v0.5.0 export VLLM_SIMULATOR_TAG +ACTIVATOR_IMAGE_TAG_BASE ?= $(IMAGE_REGISTRY)/$(PROJECT_NAME)-activator +ACTIVATOR_IMG = $(ACTIVATOR_IMAGE_TAG_BASE):$(EPP_TAG) + # Map go arch to typos arch ifeq ($(TARGETARCH),amd64) TYPOS_TARGET_ARCH = x86_64 @@ -142,6 +145,14 @@ build-%: check-go install-dependencies download-tokenizer ## Build the project @printf "\033[33;1m==== Building ====\033[0m\n" go build $($*_LDFLAGS) -o bin/$($*_NAME) cmd/$($*_NAME)/main.go +##@ Build Activator + +.PHONY: activator-build +activator-build: check-go install-dependencies download-tokenizer ## Build the project + @printf "\033[33;1m==== Building ====\033[0m\n" + go build -ldflags="$(LDFLAGS)" -o bin/activator cmd/activator/main.go + + ##@ Container Build/Push .PHONY: image-build @@ -171,6 +182,22 @@ image-pull: check-container-tool ## Pull all related images using $(CONTAINER_RU @printf "\033[33;1m==== Pulling Container images ====\033[0m\n" ./scripts/pull_images.sh +.PHONY: activator-image-build +activator-image-build: ## Build the activator image using Docker Buildx. + $(CONTAINER_TOOL) build \ + --platform linux/$(TARGETARCH) \ + --build-arg TARGETOS=linux \ + --build-arg TARGETARCH=${TARGETARCH} \ + --build-arg COMMIT_SHA=${GIT_COMMIT_SHA} \ + --build-arg BUILD_REF=${BUILD_REF} \ + -t $(ACTIVATOR_IMG) \ + -f Dockerfile.activator . + +.PHONY: activator-image-push +activator-image-push: check-container-tool load-version-json ## Push Activator Docker image $(ACTIVATOR_IMG) to registry + @printf "\033[33;1m==== Pushing Activator Docker image $(ACTIVATOR_IMG) ====\033[0m\n" + $(CONTAINER_TOOL) push $(ACTIVATOR_IMG) + ##@ Install/Uninstall Targets # Default install/uninstall (Docker) @@ -287,7 +314,7 @@ check-typos: $(TYPOS) ## Check for spelling errors using typos (exits with error echo "$$TYPOS_OUTPUT"; \ exit 1; \ fi - + ##@ Tools .PHONY: check-tools diff --git a/charts/activator-filter/.helmignore b/charts/activator-filter/.helmignore new file mode 100644 index 00000000..0e8a0eb3 --- /dev/null +++ b/charts/activator-filter/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/charts/activator-filter/Chart.yaml b/charts/activator-filter/Chart.yaml new file mode 100644 index 00000000..1e3c0b74 --- /dev/null +++ b/charts/activator-filter/Chart.yaml @@ -0,0 +1,9 @@ +apiVersion: v2 +name: activator +description: A Helm chart for the activator extension + +type: application + +version: 0.1.0 + +appVersion: "0.2.0" diff --git a/charts/activator-filter/README.md b/charts/activator-filter/README.md new file mode 100644 index 00000000..7e57903e --- /dev/null +++ b/charts/activator-filter/README.md @@ -0,0 +1,31 @@ +# Activator Chart + +A chart to deploy the activator HTTP filter for an InferenceGateway and RBAC for all per route activator deployments. + +## Install + +To install an activator-filter named `activator-filter`, you can run the following command: + +```txt +$ helm install activator-filter ./charts/activator-filter +``` + +## Uninstall + +Run the following command to uninstall the chart: + +```txt +$ helm uninstall activator-filter +``` + +## Configuration + +The following table list the configurable parameters of the chart. + +| **Parameter Name** | **Description** | +|---------------------------------------------|----------------------------------------------------------------------------------------------------| +| `name` | Name of the activator RBAC resources. Defaults to `activator`. | + +## Notes + +This chart should only be deployed once. diff --git a/charts/activator-filter/templates/NOTES.txt b/charts/activator-filter/templates/NOTES.txt new file mode 100644 index 00000000..c0d990f6 --- /dev/null +++ b/charts/activator-filter/templates/NOTES.txt @@ -0,0 +1 @@ +Activator HTTP Filter deployed. diff --git a/charts/activator-filter/templates/istio.yaml b/charts/activator-filter/templates/istio.yaml new file mode 100644 index 00000000..32d4f075 --- /dev/null +++ b/charts/activator-filter/templates/istio.yaml @@ -0,0 +1,25 @@ +apiVersion: networking.istio.io/v1alpha3 +kind: EnvoyFilter +metadata: + name: {{ .Values.name }}-ext-proc + namespace: {{ .Release.Namespace }} +spec: + configPatches: + - applyTo: HTTP_FILTER + match: + # context omitted so that this applies to both sidecars and gateways + listener: + filterChain: + filter: + name: "envoy.filters.network.http_connection_manager" + patch: + operation: INSERT_FIRST # TODO: insert before EPP + value: + name: envoy.filters.http.activator.ext_proc + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor + failure_mode_allow: true + grpc_service: + envoy_grpc: + cluster_name: no-op + message_timeout: 120s diff --git a/charts/activator-filter/templates/rbac.yaml b/charts/activator-filter/templates/rbac.yaml new file mode 100644 index 00000000..f6afa52f --- /dev/null +++ b/charts/activator-filter/templates/rbac.yaml @@ -0,0 +1,81 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Values.name }} + namespace: {{ .Release.Namespace }} +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: {{ .Values.name }} + namespace: {{ .Release.Namespace }} +rules: # TODO: These can probably be trimmed down +- apiGroups: + - "inference.networking.x-k8s.io" + resources: + - "inferencepools" + verbs: + - "get" + - "watch" + - "list" +- apiGroups: + - "" + resources: + - "pods" + verbs: + - "get" + - "watch" + - "list" +- apiGroups: + - "discovery.k8s.io" + resources: + - "endpointslices" + verbs: + - "get" + - "watch" + - "list" +- apiGroups: + - "authentication.k8s.io" + resources: + - "tokenreviews" + verbs: + - "create" +- apiGroups: + - "authorization.k8s.io" + resources: + - "subjectaccessreviews" + verbs: + - "create" +- apiGroups: + - "apps" + resources: + - "deployments" + verbs: + - "create" + - "get" + - "list" + - "watch" + - "update" + - "patch" + - "delete" +- apiGroups: + - apps + resources: + - deployments/scale + verbs: + - get + - update +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: {{ .Values.name }} + namespace: {{ .Release.Namespace }} +subjects: +- kind: ServiceAccount + name: {{ .Values.name }} + namespace: {{ .Release.Namespace }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: {{ .Values.name }} diff --git a/charts/activator-filter/values.yaml b/charts/activator-filter/values.yaml new file mode 100644 index 00000000..9781aee0 --- /dev/null +++ b/charts/activator-filter/values.yaml @@ -0,0 +1 @@ +name: activator diff --git a/charts/activator/.helmignore b/charts/activator/.helmignore new file mode 100644 index 00000000..0e8a0eb3 --- /dev/null +++ b/charts/activator/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/charts/activator/Chart.yaml b/charts/activator/Chart.yaml new file mode 100644 index 00000000..27ac6a2a --- /dev/null +++ b/charts/activator/Chart.yaml @@ -0,0 +1,9 @@ +apiVersion: v2 +name: activator-route +description: A Helm chart for the activator extension + +type: application + +version: 0.1.0 + +appVersion: "0.2.0" diff --git a/charts/activator/README.md b/charts/activator/README.md new file mode 100644 index 00000000..4f244735 --- /dev/null +++ b/charts/activator/README.md @@ -0,0 +1,44 @@ +# Activator Chart + +A chart to deploy the activator deployment and service per HTTPRoute. + +## Install + +To install an activator named `-activator`, you can run the following command: + +```txt +$ helm install activator ./charts/activator \ + --set route.name=http-route-name \ + --set inferencePool.name=inference-pool-name + +``` + +## Uninstall + +Run the following command to uninstall the chart: + +```txt +$ helm uninstall activator +``` + +## Configuration + +The following table list the configurable parameters of the chart. + +| **Parameter Name** | **Description** | +|---------------------------------------------|----------------------------------------------------------------------------------------------------| +| `activator.suffix` | Suffix to append to the name of the activator deployment and service. Defaults to `-activator`. | +| `activator.port` | Port serving ext_proc. Defaults to `9004`. | +| `activator.healthCheckPort` | Port for health checks. Defaults to `9005`. | +| `activator.image.name` | Name of the container image used. | +| `activator.image.registry` | Registry URL and namespace where the image is hosted. | +| `activator.image.tag` | Image tag. | +| `activator.image.pullPolicy` | Image pull policy for the container. Possible values: `Always`, `IfNotPresent`, or `Never`. Defaults to `Always`. | +| `inferenceGateway.port` | The port of the Gateway. Defaults to `80`. | +| `inferencePool.name` | The name of the InferencePool to target. | +| `inferencePool.apiVersion` | The API version of the InferencePool. Defaults to `inference.networking.x-k8s.io`. | +| `route.name` | The name of the HTTPRoute to attach the activator to. | + +## Notes + +This chart should only be deployed once per HTTPRoute. diff --git a/charts/activator/templates/NOTES.txt b/charts/activator/templates/NOTES.txt new file mode 100644 index 00000000..f0fa7a37 --- /dev/null +++ b/charts/activator/templates/NOTES.txt @@ -0,0 +1 @@ +Activator extension deployed for HTTPRoute {{.Values.route.name }} diff --git a/charts/activator/templates/activator.yaml b/charts/activator/templates/activator.yaml new file mode 100644 index 00000000..369a54e9 --- /dev/null +++ b/charts/activator/templates/activator.yaml @@ -0,0 +1,54 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "activatorName" . }} + namespace: {{ .Release.Namespace }} +spec: + replicas: 1 + selector: + matchLabels: + app: {{ include "activatorName" . }} + template: + metadata: + labels: + app: {{ include "activatorName" . }} + spec: + containers: + - name: activator + image: {{ .Values.activator.image.registry }}/{{ .Values.activator.image.name }}:{{ .Values.activator.image.tag }} + imagePullPolicy: {{ .Values.activator.image.pullPolicy | default "Always" }} + args: + - "--pool-name" + - "{{ .Values.inferencePool.name }}" + - "--pool-namespace" + - "{{ .Release.Namespace }}" + - "--pool-group" + - "{{ .Values.inferencePool.group }}" + - "--grpc-port" + - "{{ .Values.activator.port }}" + - "--grpc-health-port" + - "{{ .Values.activator.healthCheckPort }}" + - "--zap-encoder" + - "json" + - "--v" + - "2" + ports: + - containerPort: {{ .Values.activator.port }} + # health check + - containerPort: {{ .Values.activator.healthCheckPort }} + serviceAccountName: activator +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ include "activatorName" . }} + namespace: {{ .Release.Namespace }} +spec: + selector: + app: {{ include "activatorName" . }} + ports: + - protocol: TCP + port: {{ .Values.activator.port }} + targetPort: {{ .Values.activator.port }} + appProtocol: HTTP2 + type: ClusterIP diff --git a/charts/activator/templates/helpers.tpl b/charts/activator/templates/helpers.tpl new file mode 100644 index 00000000..9ad85bd2 --- /dev/null +++ b/charts/activator/templates/helpers.tpl @@ -0,0 +1,3 @@ +{{- define "activatorName" -}} +{{ .Values.route.name }}{{ .Values.activator.suffix }} +{{- end -}} diff --git a/charts/activator/templates/istio.yaml b/charts/activator/templates/istio.yaml new file mode 100644 index 00000000..4723b259 --- /dev/null +++ b/charts/activator/templates/istio.yaml @@ -0,0 +1,44 @@ +apiVersion: networking.istio.io/v1alpha3 +kind: EnvoyFilter +metadata: + name: {{ include "activatorName" . }} + namespace: {{ .Release.Namespace }} +spec: + configPatches: + - applyTo: HTTP_ROUTE + match: + routeConfiguration: + vhost: + name: "*:{{ .Values.inferenceGateway.port }}" + route: + name: {{ .Release.Namespace }}.{{ .Values.route.name }}.0 # TODO: what .0? + patch: + operation: MERGE + value: + typed_per_filter_config: + envoy.filters.http.activator.ext_proc: + "@type": type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcPerRoute + overrides: + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SKIP" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + grpc_service: + envoy_grpc: + cluster_name: outbound|{{ .Values.activator.port }}||{{ include "activatorName" . }}.{{ .Release.Namespace }}.svc.cluster.local + +--- +apiVersion: networking.istio.io/v1 +kind: DestinationRule +metadata: + name: {{ include "activatorName" . }} + namespace: {{ .Release.Namespace }} +spec: + host: {{ include "activatorName" . }}.{{ .Release.Namespace }}.svc.cluster.local + trafficPolicy: + tls: + mode: SIMPLE + insecureSkipVerify: true diff --git a/charts/activator/values.yaml b/charts/activator/values.yaml new file mode 100644 index 00000000..eee5d70d --- /dev/null +++ b/charts/activator/values.yaml @@ -0,0 +1,19 @@ +activator: + suffix: -activator + image: + name: activator + registry: ghcr.io/llm-d/llm-d-activator # hostname and namespace of the image registry + tag: main + pullPolicy: Always + port: 9004 + healthCheckPort: 9005 + +route: + name: http-route + +inferencePool: + name: inference-pool + group: inference.networking.x-k8s.io + +inferenceGateway: + port: 80 diff --git a/cmd/activator/main.go b/cmd/activator/main.go new file mode 100644 index 00000000..3e7e38f0 --- /dev/null +++ b/cmd/activator/main.go @@ -0,0 +1,35 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package main implements the main entry point for the activator component. +package main + +import ( + "os" + + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/llm-d/llm-d-inference-scheduler/cmd/activator/runner" +) + +func main() { + // For adding out-of-tree plugins to the plugins registry, use the following: + // plugins.Register(my-out-of-tree-plugin-name, my-out-of-tree-plugin-factory-function) + + if err := runner.Run(ctrl.SetupSignalHandler()); err != nil { + os.Exit(1) + } +} diff --git a/cmd/activator/runner/health.go b/cmd/activator/runner/health.go new file mode 100644 index 00000000..e5683702 --- /dev/null +++ b/cmd/activator/runner/health.go @@ -0,0 +1,123 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package runner provides the main runner for the activator component. +package runner + +import ( + "context" + "fmt" + "sync/atomic" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/go-logr/logr" + "google.golang.org/grpc/codes" + healthPb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +type healthServer struct { + logger logr.Logger + datastore datastore.Datastore + isLeader *atomic.Bool + leaderElectionEnabled bool +} + +const ( + livenessCheckService = "liveness" + readinessCheckService = "readiness" +) + +func (s *healthServer) Check(_ context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) { + isLive := s.datastore.PoolHasSynced() + + // If leader election is disabled, use current logic: all checks are based on whether the pool has synced. + if !s.leaderElectionEnabled { + if !isLive { + s.logger.V(logutil.DEFAULT).Info("gRPC health check not serving (leader election disabled)", "service", in.Service) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil + } + s.logger.V(logutil.TRACE).Info("gRPC health check serving (leader election disabled)", "service", in.Service) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil + } + + // When leader election is enabled, differentiate between liveness and readiness. + // The service name in the request determines which check to perform. + var checkName string + var isPassing bool + + switch in.Service { + case readinessCheckService: + checkName = "readiness" + isPassing = isLive && s.isLeader.Load() + case "": // Handle overall server health for load balancers that use an empty service name. + checkName = "empty service name (considered as overall health)" + // The overall health for a load balancer should reflect readiness to accept traffic, + // which is true only for the leader pod that has synced its data. + isPassing = isLive && s.isLeader.Load() + case livenessCheckService: + checkName = "liveness" + // Any pod that is running and can respond to this gRPC check is considered "live". + // The datastore sync status should not affect liveness, only readiness. + // This is to prevent the non-leader node from continuous restarts + isPassing = true + case extProcPb.ExternalProcessor_ServiceDesc.ServiceName: + // The main service is considered ready only on the leader. + checkName = "ext_proc" + isPassing = isLive && s.isLeader.Load() + default: + s.logger.V(logutil.DEFAULT).Info("gRPC health check requested unknown service", "available-services", []string{livenessCheckService, readinessCheckService, extProcPb.ExternalProcessor_ServiceDesc.ServiceName}, "requested-service", in.Service) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVICE_UNKNOWN}, nil + } + + if !isPassing { + s.logger.V(logutil.DEFAULT).Info(fmt.Sprintf("gRPC %s check not serving", checkName), "service", in.Service, "isLive", isLive, "isLeader", s.isLeader.Load()) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil + } + + s.logger.V(logutil.TRACE).Info(fmt.Sprintf("gRPC %s check serving", checkName), "service", in.Service) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil +} + +func (s *healthServer) List(ctx context.Context, _ *healthPb.HealthListRequest) (*healthPb.HealthListResponse, error) { + statuses := make(map[string]*healthPb.HealthCheckResponse) + + services := []string{extProcPb.ExternalProcessor_ServiceDesc.ServiceName} + if s.leaderElectionEnabled { + services = append(services, livenessCheckService, readinessCheckService) + } + + for _, service := range services { + resp, err := s.Check(ctx, &healthPb.HealthCheckRequest{Service: service}) + if err != nil { + // Check can return an error for unknown services, but here we are iterating known services. + // If another error occurs, we should probably return it. + return nil, err + } + statuses[service] = resp + } + + return &healthPb.HealthListResponse{ + Statuses: statuses, + }, nil +} + +func (s *healthServer) Watch(_ *healthPb.HealthCheckRequest, _ healthPb.Health_WatchServer) error { + return status.Error(codes.Unimplemented, "Watch is not implemented") +} diff --git a/cmd/activator/runner/runner.go b/cmd/activator/runner/runner.go new file mode 100644 index 00000000..e31e7898 --- /dev/null +++ b/cmd/activator/runner/runner.go @@ -0,0 +1,259 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runner + +import ( + "context" + "flag" + "fmt" + "os" + "sync/atomic" + + "github.com/go-logr/logr" + uberzap "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + healthPb "google.golang.org/grpc/health/grpc_health_v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/metrics/filters" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" + "sigs.k8s.io/gateway-api-inference-extension/version" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/requestcontrol" + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/runnable" + runserver "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/server" +) + +var ( + grpcPort = flag.Int("grpc-port", runserver.DefaultGrpcPort, "The gRPC port used for communicating with Envoy proxy") + grpcHealthPort = flag.Int("grpc-health-port", runserver.DefaultGrpcHealthPort, "The port used for gRPC liveness and readiness probes") + metricsPort = flag.Int("metrics-port", runserver.DefaultMetricsPort, "The metrics port") + poolName = flag.String("pool-name", runserver.DefaultPoolName, "Name of the InferencePool this Endpoint Picker is associated with.") + poolGroup = flag.String("pool-group", runserver.DefaultPoolGroup, "group of the InferencePool this Endpoint Picker is associated with.") + poolNamespace = flag.String("pool-namespace", "", "Namespace of the InferencePool this Endpoint Picker is associated with.") + logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity") + secureServing = flag.Bool("secure-serving", runserver.DefaultSecureServing, "Enables secure serving. Defaults to true.") + healthChecking = flag.Bool("health-checking", runserver.DefaultHealthChecking, "Enables health checking") + certPath = flag.String("cert-path", runserver.DefaultCertPath, "The path to the certificate for secure serving. The certificate and private key files "+ + "are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+ + "then a self-signed certificate is used.") + haEnableLeaderElection = flag.Bool("ha-enable-leader-election", false, "Enables leader election for high availability. When enabled, readiness probes will only pass on the leader.") + + setupLog = ctrl.Log.WithName("setup") +) + +// Run starts the activator runner with the provided context, initializes logging, +// controllers, and servers, and blocks until the manager exits or the context is cancelled. +func Run(ctx context.Context) error { + opts := zap.Options{ + Development: true, + } + opts.BindFlags(flag.CommandLine) + flag.Parse() + initLogging(&opts) + + setupLog.Info("GIE build", "commit-sha", version.CommitSHA, "build-ref", version.BuildRef) + + // Validate flags + if err := validateFlags(); err != nil { + setupLog.Error(err, "Failed to validate flags") + return err + } + + // Print all flag values + flags := make(map[string]any) + flag.VisitAll(func(f *flag.Flag) { + flags[f.Name] = f.Value + }) + setupLog.Info("Flags processed", "flags", flags) + + // --- Get Kubernetes Config --- + cfg, err := ctrl.GetConfig() + if err != nil { + setupLog.Error(err, "Failed to get Kubernetes rest config") + return err + } + + // --- Setup Datastore --- + datastore := datastore.NewDatastore(ctx) + + // --- Setup Activator --- + activator, err := requestcontrol.NewActivatorWithConfig(cfg, datastore) + if err != nil { + setupLog.Error(err, "Failed to setup Activator") + return err + } + + // --- Setup Deactivator --- + deactivator, err := requestcontrol.DeactivatorWithConfig(cfg, &datastore) + if err != nil { + setupLog.Error(err, "Failed to setup Deactivator") + return err + } + + // Start Deactivator + go deactivator.MonitorInferencePoolIdleness(ctx) + + // --- Setup Metrics Server --- + metricsServerOptions := metricsserver.Options{ + BindAddress: fmt.Sprintf(":%d", *metricsPort), + FilterProvider: filters.WithAuthenticationAndAuthorization, + } + + // Determine pool namespace: if --pool-namespace is non-empty, use it; else NAMESPACE env var; else default + resolvePoolNamespace := func() string { + if *poolNamespace != "" { + return *poolNamespace + } + if nsEnv := os.Getenv("NAMESPACE"); nsEnv != "" { + return nsEnv + } + return runserver.DefaultPoolNamespace + } + resolvedPoolNamespace := resolvePoolNamespace() + poolNamespacedName := types.NamespacedName{ + Name: *poolName, + Namespace: resolvedPoolNamespace, + } + poolGroupKind := schema.GroupKind{ + Group: *poolGroup, + Kind: "InferencePool", + } + poolGKNN := common.GKNN{ + NamespacedName: poolNamespacedName, + GroupKind: poolGroupKind, + } + + isLeader := &atomic.Bool{} + isLeader.Store(false) + + mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions, *haEnableLeaderElection) + if err != nil { + setupLog.Error(err, "Failed to create controller manager") + return err + } + + if *haEnableLeaderElection { + setupLog.Info("Leader election enabled") + go func() { + <-mgr.Elected() + isLeader.Store(true) + setupLog.Info("This instance is now the leader!") + }() + } else { + // If leader election is disabled, all instances are "leaders" for readiness purposes. + isLeader.Store(true) + } + + // --- Setup ExtProc Server Runner --- + serverRunner := &runserver.ExtProcServerRunner{ + GrpcPort: *grpcPort, + PoolNamespacedName: poolNamespacedName, + PoolGKNN: poolGKNN, + Datastore: datastore, + SecureServing: *secureServing, + HealthChecking: *healthChecking, + CertPath: *certPath, + Activator: activator, + } + if err := serverRunner.SetupWithManager(ctx, mgr); err != nil { + setupLog.Error(err, "Failed to setup Activator controllers") + return err + } + + // --- Add Runnables to Manager --- + // Register health server. + if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort, isLeader, *haEnableLeaderElection); err != nil { + return err + } + + // Register ext-proc server. + if err := registerExtProcServer(mgr, serverRunner, ctrl.Log.WithName("ext-proc")); err != nil { + return err + } + + // --- Start Manager --- + // This blocks until a signal is received. + setupLog.Info("Controller manager starting") + if err := mgr.Start(ctx); err != nil { + setupLog.Error(err, "Error starting controller manager") + return err + } + setupLog.Info("Controller manager terminated") + return nil +} + +func initLogging(opts *zap.Options) { + // Unless -zap-log-level is explicitly set, use -v + useV := true + flag.Visit(func(f *flag.Flag) { + if f.Name == "zap-log-level" { + useV = false + } + }) + if useV { + // See https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/log/zap#Options.Level + lvl := -1 * (*logVerbosity) + opts.Level = uberzap.NewAtomicLevelAt(zapcore.Level(int8(lvl))) + } + + logger := zap.New(zap.UseFlagOptions(opts), zap.RawZapOpts(uberzap.AddCaller())) + ctrl.SetLogger(logger) +} + +// registerExtProcServer adds the ExtProcServerRunner as a Runnable to the manager. +func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerRunner, logger logr.Logger) error { + if err := mgr.Add(runner.AsRunnable(logger)); err != nil { + setupLog.Error(err, "Failed to register ext-proc gRPC server runnable") + return err + } + setupLog.Info("ExtProc server runner added to manager.") + return nil +} + +// registerHealthServer adds the Health gRPC server as a Runnable to the given manager. +func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int, isLeader *atomic.Bool, leaderElectionEnabled bool) error { + srv := grpc.NewServer() + healthPb.RegisterHealthServer(srv, &healthServer{ + logger: logger, + datastore: ds, + isLeader: isLeader, + leaderElectionEnabled: leaderElectionEnabled, + }) + if err := mgr.Add( + runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil { + setupLog.Error(err, "Failed to register health server") + return err + } + return nil +} + +func validateFlags() error { + if *poolName == "" { + return fmt.Errorf("required %q flag not set", "poolName") + } + + return nil +} diff --git a/pkg/activator/controller/inferencepool_reconciler.go b/pkg/activator/controller/inferencepool_reconciler.go new file mode 100644 index 00000000..23751819 --- /dev/null +++ b/pkg/activator/controller/inferencepool_reconciler.go @@ -0,0 +1,120 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package controller contains reconcilers for InferencePool resources used by the activator +// component; it implements controller-runtime Reconciler logic to keep the Datastore synchronized +// with Kubernetes InferencePool objects across supported API groups. +package controller + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/api/errors" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +// InferencePoolReconciler utilizes the controller runtime to reconcile Instance Gateway resources +// This implementation is just used for reading & maintaining data sync. The Gateway implementation +// will have the proper controller that will create/manage objects on behalf of the server pool. +type InferencePoolReconciler struct { + client.Reader + Datastore datastore.Datastore + PoolGKNN common.GKNN +} + +// Reconcile reconciles an InferencePool and keeps the Datastore synchronized with the +// current state of the corresponding Kubernetes InferencePool object. +func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx).WithValues("group", c.PoolGKNN.Group).V(logutil.DEFAULT) + ctx = ctrl.LoggerInto(ctx, logger) + + logger.Info("Reconciling InferencePool") + + // 1. Initialize a generic client.Object based on the group. + var obj client.Object + switch c.PoolGKNN.Group { + case v1.GroupName: + obj = &v1.InferencePool{} + case v1alpha2.GroupName: + obj = &v1alpha2.InferencePool{} + default: + // Handle unsupported groups gracefully. + return ctrl.Result{}, fmt.Errorf("cannot reconcile InferencePool - unsupported API group: %s", c.PoolGKNN.Group) + } + + // 2. Perform a single, generic fetch for the object. + if err := c.Get(ctx, req.NamespacedName, obj); err != nil { + if errors.IsNotFound(err) { + logger.Info("InferencePool not found. Clearing the datastore") + c.Datastore.Clear() + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("unable to get InferencePool - %w", err) + } + + // 3. Perform common checks using the client.Object interface. + if !obj.GetDeletionTimestamp().IsZero() { + logger.Info("InferencePool is marked for deletion. Clearing the datastore") + c.Datastore.Clear() + return ctrl.Result{}, nil + } + + // 4. Convert the fetched object to the canonical v1.InferencePool. + v1infPool := &v1.InferencePool{} + + switch pool := obj.(type) { + case *v1.InferencePool: + // If it's already a v1 object, just use it. + v1infPool = pool + case *v1alpha2.InferencePool: + var err error + err = pool.ConvertTo(v1infPool) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to convert XInferencePool to InferencePool - %w", err) + } + default: + return ctrl.Result{}, fmt.Errorf("unsupported API group: %s", c.PoolGKNN.Group) + } + + c.Datastore.PoolSet(v1infPool) + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the manager for the configured API group. +func (c *InferencePoolReconciler) SetupWithManager(mgr ctrl.Manager) error { + switch c.PoolGKNN.Group { + case v1alpha2.GroupName: + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha2.InferencePool{}). + Complete(c) + case v1.GroupName: + return ctrl.NewControllerManagedBy(mgr). + For(&v1.InferencePool{}). + Complete(c) + default: + return fmt.Errorf("unknown group %s", c.PoolGKNN.Group) + } +} diff --git a/pkg/activator/datastore/datastore.go b/pkg/activator/datastore/datastore.go new file mode 100644 index 00000000..d1a8242d --- /dev/null +++ b/pkg/activator/datastore/datastore.go @@ -0,0 +1,114 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package datastore provides a local in-memory cache for the activator component, +// storing the current InferencePool and managing a ticker used by background +// goroutines; it exposes thread-safe operations to get/set the pool and control +// the ticker. +package datastore + +import ( + "context" + "errors" + "sync" + "time" + + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" +) + +var ( + errPoolNotSynced = errors.New("InferencePool is not initialized in data store") +) + +// Datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api) +type Datastore interface { + // InferencePool operations + // PoolSet sets the given pool in datastore. + PoolSet(pool *v1.InferencePool) + PoolGet() (*v1.InferencePool, error) + PoolHasSynced() bool + + GetTicker() *time.Ticker + ResetTicker(t time.Duration) + StopTicker() + + // Clears the store state, happens when the pool gets deleted. + Clear() +} + +// NewDatastore creates a new Datastore instance with the provided parent context. +func NewDatastore(parentCtx context.Context) Datastore { + store := &datastore{ + parentCtx: parentCtx, + poolMu: sync.RWMutex{}, + ticker: time.NewTicker(60 * time.Second), + } + return store +} + +type datastore struct { + // parentCtx controls the lifecycle of the background metrics goroutines that spawn up by the datastore. + parentCtx context.Context + // poolMu is used to synchronize access to pool map. + poolMu sync.RWMutex + pool *v1.InferencePool + ticker *time.Ticker +} + +// /// InferencePool APIs /// +func (ds *datastore) PoolSet(pool *v1.InferencePool) { + ds.poolMu.Lock() + defer ds.poolMu.Unlock() + + ds.pool = pool +} + +func (ds *datastore) PoolGet() (*v1.InferencePool, error) { + ds.poolMu.RLock() + defer ds.poolMu.RUnlock() + if !ds.PoolHasSynced() { + return nil, errPoolNotSynced + } + return ds.pool, nil +} + +func (ds *datastore) PoolHasSynced() bool { + ds.poolMu.RLock() + defer ds.poolMu.RUnlock() + return ds.pool != nil +} + +func (ds *datastore) Clear() { + ds.PoolSet(nil) +} + +func (ds *datastore) ResetTicker(t time.Duration) { + ds.poolMu.RLock() + defer ds.poolMu.RUnlock() + ds.ticker.Reset(t) +} + +func (ds *datastore) GetTicker() *time.Ticker { + ds.poolMu.RLock() + defer ds.poolMu.RUnlock() + return ds.ticker +} + +func (ds *datastore) StopTicker() { + ds.poolMu.RLock() + defer ds.poolMu.RUnlock() + ds.ticker.Stop() +} diff --git a/pkg/activator/datastore/datastore_test.go b/pkg/activator/datastore/datastore_test.go new file mode 100644 index 00000000..76538d92 --- /dev/null +++ b/pkg/activator/datastore/datastore_test.go @@ -0,0 +1,83 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datastore + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + testutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" +) + +func TestPool(t *testing.T) { + pool1Selector := map[string]string{"app": "vllm_v1"} + pool1 := testutil.MakeInferencePool("pool1"). + Namespace("default"). + Selector(pool1Selector).ObjRef() + tests := []struct { + name string + inferencePool *v1.InferencePool + wantSynced bool + wantPool *v1.InferencePool + wantErr error + }{ + { + name: "Ready when InferencePool exists in data store", + inferencePool: pool1, + wantSynced: true, + wantPool: pool1, + }, + { + name: "Labels not matched", + inferencePool: pool1, + wantSynced: true, + wantPool: pool1, + }, + { + name: "Not ready when InferencePool is nil in data store", + wantErr: errPoolNotSynced, + wantSynced: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Set up the scheme. + scheme := runtime.NewScheme() + _ = clientgoscheme.AddToScheme(scheme) + + datastore := NewDatastore(context.Background()) + datastore.PoolSet(tt.inferencePool) + gotPool, gotErr := datastore.PoolGet() + if diff := cmp.Diff(tt.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Unexpected error diff (+got/-want): %s", diff) + } + if diff := cmp.Diff(tt.wantPool, gotPool); diff != "" { + t.Errorf("Unexpected pool diff (+got/-want): %s", diff) + } + gotSynced := datastore.PoolHasSynced() + if diff := cmp.Diff(tt.wantSynced, gotSynced); diff != "" { + t.Errorf("Unexpected synced diff (+got/-want): %s", diff) + } + }) + } +} diff --git a/pkg/activator/handlers/server.go b/pkg/activator/handlers/server.go new file mode 100644 index 00000000..5673d5ed --- /dev/null +++ b/pkg/activator/handlers/server.go @@ -0,0 +1,202 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package handlers implements the gRPC server for Envoy's external processing API, +package handlers + +import ( + "context" + "io" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "sigs.k8s.io/controller-runtime/pkg/log" + + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +var continueHeadersResponse = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestHeaders{ + RequestHeaders: &extProcPb.HeadersResponse{ + Response: &extProcPb.CommonResponse{ + Status: extProcPb.CommonResponse_CONTINUE, + }, + }, + }, +} + +// NewStreamingServer creates a new StreamingServer with the provided Datastore and Activator. +func NewStreamingServer(datastore Datastore, activator Activator) *StreamingServer { + return &StreamingServer{ + activator: activator, + datastore: datastore, + } +} + +// Activator defines the interface for activating replicas based on incoming requests. +type Activator interface { + MayActivate(ctx context.Context) error +} + +// Datastore defines the interface for accessing InferencePool data. +type Datastore interface { + PoolGet() (*v1.InferencePool, error) +} + +// StreamingServer implements the Envoy external processing server. +// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto +type StreamingServer struct { + datastore Datastore + activator Activator +} + +// Process handles the bidirectional streaming RPC for external processing. +func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { + ctx := srv.Context() + logger := log.FromContext(ctx) + loggerTrace := logger.V(logutil.TRACE) + loggerTrace.Info("Processing") + + var err error + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + req, recvErr := srv.Recv() + if recvErr == io.EOF || status.Code(recvErr) == codes.Canceled { + return nil + } + if recvErr != nil { + return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err) + } + + switch req.Request.(type) { + case *extProcPb.ProcessingRequest_RequestHeaders: + + if err := s.activator.MayActivate(ctx); err != nil { + if logger.V(logutil.DEBUG).Enabled() { + logger.V(logutil.DEBUG).Error(err, "Failed to process request", "request", req) + } else { + logger.V(logutil.DEFAULT).Error(err, "Failed to process request") + } + resp, err := buildErrResponse(err) + if err != nil { + return err + } + if err := srv.Send(resp); err != nil { + logger.V(logutil.DEFAULT).Error(err, "Send failed") + return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) + } + return nil + } + + loggerTrace.Info("Sending request header response") + if err := srv.Send(continueHeadersResponse); err != nil { + logger.V(logutil.DEFAULT).Error(err, "error sending response") + return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) + } + + case *extProcPb.ProcessingRequest_RequestBody: + logger.V(logutil.DEBUG).Info("Error: ProcessingRequest_RequestBody received") + case *extProcPb.ProcessingRequest_RequestTrailers: + logger.V(logutil.DEBUG).Info("Error: ProcessingRequest_RequestTrailers received") + case *extProcPb.ProcessingRequest_ResponseHeaders: + logger.V(logutil.DEBUG).Info("Error: ProcessingRequest_ResponseHeaders received") + case *extProcPb.ProcessingRequest_ResponseBody: + logger.V(logutil.DEBUG).Info("Error: ProcessingRequest_ResponseBody received") + case *extProcPb.ProcessingRequest_ResponseTrailers: + logger.V(logutil.DEBUG).Info("Error: ProcessingRequest_ResponseTrailers received") + } + + } +} + +func buildErrResponse(err error) (*extProcPb.ProcessingResponse, error) { + var resp *extProcPb.ProcessingResponse + + switch errutil.CanonicalCode(err) { + // This code can be returned by scheduler when there is no capacity for sheddable + // requests. + case errutil.InferencePoolResourceExhausted: + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_TooManyRequests, + }, + }, + }, + } + // This code can be returned by when EPP processes the request and run into server-side errors. + case errutil.Internal: + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_InternalServerError, + }, + }, + }, + } + // This code can be returned by the director when there are no candidate pods for the request scheduling. + case errutil.ServiceUnavailable: + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_ServiceUnavailable, + }, + }, + }, + } + // This code can be returned when users provide invalid json request. + case errutil.BadRequest: + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_BadRequest, + }, + }, + }, + } + case errutil.BadConfiguration: + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_NotFound, + }, + }, + }, + } + default: + return nil, status.Errorf(status.Code(err), "failed to handle request: %v", err) + } + + if err.Error() != "" { + resp.Response.(*extProcPb.ProcessingResponse_ImmediateResponse).ImmediateResponse.Body = []byte(err.Error()) + } + + return resp, nil +} diff --git a/pkg/activator/requestcontrol/activator.go b/pkg/activator/requestcontrol/activator.go new file mode 100644 index 00000000..0cb23f4a --- /dev/null +++ b/pkg/activator/requestcontrol/activator.go @@ -0,0 +1,295 @@ +// Package requestcontrol implements the activator logic for controlling request flow +package requestcontrol + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" + + autoscaling "k8s.io/api/autoscaling/v1" + "k8s.io/client-go/discovery" + cached "k8s.io/client-go/discovery/cached" + "k8s.io/client-go/scale" +) + +const ( + objectAPIVersionKey = "activator.llm-d.ai/target-apiversion" + objectkindKey = "activator.llm-d.ai/target-kind" + objectNameKey = "activator.llm-d.ai/target-name" + scaleFromZeroGracePeriodKey = "activator.llm-d.ai/scale-from-zero-grace-period" // Optional annotation + + // DefaultScaleFromZeroGracePeriod is the time we will wait for a scale-from-zero decision to complete + DefaultScaleFromZeroGracePeriod = 60 * time.Second + + // DefaultScaleDownDelay is the amount of time that must pass before a scale-down decision is applied + DefaultScaleDownDelay = 120 * time.Second + + // ScaleToZeroRequestRetentionPeriod it is the amount of time we will wait before releasing the request after a scale from zero event + ScaleToZeroRequestRetentionPeriod = 5 * time.Second +) + +type scaledObjectData struct { + name string + scaleGracePeriod time.Duration + numReplicas int32 + scaleObject *autoscaling.Scale +} + +// Activator implements the logic for activating replicas based on incoming requests. +type Activator struct { + DynamicClient *dynamic.DynamicClient + ScaleClient scale.ScalesGetter + Mapper meta.RESTMapper + datastore datastore.Datastore + + scalingUp bool + guard chan struct{} + scalingUpAndGuardMu sync.Mutex +} + +// NewActivatorWithConfig creates a new Activator with the provided REST config and Datastore. +func NewActivatorWithConfig(config *rest.Config, datastore datastore.Datastore) (*Activator, error) { + scaleClient, mapper, err := initScaleClient(config) + if err != nil { + return nil, err + } + + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + return nil, err + } + + return &Activator{ + datastore: datastore, + DynamicClient: dynamicClient, + Mapper: mapper, + ScaleClient: scaleClient}, nil +} + +// MayActivate checks if the inferencePool associated with the request is scaled to one or more replicas +func (a *Activator) MayActivate(ctx context.Context) error { + logger := log.FromContext(ctx) + + // Get InferencePool Info + pool, err := a.datastore.PoolGet() + if err != nil { + return err + } + + logger.V(logutil.TRACE).Info("InferencePool found", "name", pool.Name, "namespace", pool.Namespace) + + // First: check if the inferencePool is currently scaling up from zero replicas + if scalingUp, guard := a.isScalingUp(); scalingUp { + logger.V(logutil.DEBUG).Info("InferencePool is currently scaling up. Waiting for it to be done.") + + a.waitOnGuard(guard, DefaultScaleFromZeroGracePeriod) + return nil // After scaling up is done, allow the request to proceed even if scaling failed + } + + // Then: block until the inferencePool has enough replicas and is ready + if ready := a.InferencePoolReady(ctx, pool); !ready { + return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find active candidate pods in the inferencePool for serving the request"} + } + + // Reset the Deactivator ticker for scale to zero monitoring + scaleDownDelay := DefaultScaleDownDelay + if value, found := getOptionalPoolAnnotation(logger, scaleDownDelayKey, pool); found { + scaleDownDelay, _ = time.ParseDuration(value) + } + + a.datastore.ResetTicker(scaleDownDelay) + return nil +} + +// InferencePoolReady checks if the inferencePool has enough replicas and is ready +func (a *Activator) InferencePoolReady(ctx context.Context, pool *v1.InferencePool) bool { + logger := log.FromContext(ctx) + namespace := pool.Namespace + + // TODO: store annotation values in datastore to avoid reading them every time + // verify required inferencePool annotations + valid := verifyPoolObjectAnnotations(logger, pool) + if !valid { + return false + } + + // extract optional inferencePool annotation if it exists, otherwise use a default value + scaleGracePeriod := DefaultScaleFromZeroGracePeriod + if value, found := getOptionalPoolAnnotation(logger, scaleFromZeroGracePeriodKey, pool); found { + scaleGracePeriod, _ = time.ParseDuration(value) + } + + // Get the scale subresource for the target inferencePool object + gvr, err := GetResourceForKind(a.Mapper, pool.Annotations[objectAPIVersionKey], pool.Annotations[objectkindKey]) + if err != nil { + msg := "Failed to parse Group, Version, Kind, Resource" + logger.Error(err, msg, "apiVersion", pool.Annotations[objectAPIVersionKey], "kind", pool.Annotations[objectkindKey]) + return false + } + + gr := gvr.GroupResource() + scaleObject, err := a.ScaleClient.Scales(namespace).Get(ctx, gr, pool.Annotations[objectNameKey], metav1.GetOptions{}) + if err != nil { + logger.Error(err, "Error getting scale subresource object") + return true + } + + // Common case: enough replicas? + if scaleObject.Spec.Replicas > 0 { + if a.inferencePoolPodsReady(logger, namespace, pool.Annotations[objectNameKey], scaleObject.Spec.Replicas, scaleGracePeriod, gvr) { + // Scale object exists and has no zero running replicas then do not scale it + logger.V(logutil.DEBUG).Info(fmt.Sprintf("Scale Object %s have at least one replica ready. Skipping scaling from zero", scaleObject.Name)) + return true + } + } + + // Need to scale inferencePool workload from zero to one replicas + numReplicas := int32(1) + scaleData := scaledObjectData{name: pool.Annotations[objectNameKey], scaleGracePeriod: DefaultScaleFromZeroGracePeriod, numReplicas: numReplicas, scaleObject: scaleObject} + return a.scaleInferencePool(ctx, logger, namespace, scaleData, gr, gvr) +} + +func (a *Activator) inferencePoolPodsReady(logger logr.Logger, namespace, objname string, numReplicas int32, scaleGracePeriod time.Duration, gvr schema.GroupVersionResource) bool { + // Don't inherit the parent context to avoid cancellation + err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, scaleGracePeriod, false, func(ctx context.Context) (done bool, err error) { + + a.datastore.ResetTicker(DefaultScaleDownDelay) // turn off the deactivator during scale from zero events + + unstructuredObj, err := a.DynamicClient.Resource(gvr).Namespace(namespace).Get(ctx, objname, metav1.GetOptions{}) + if err != nil { + logger.Error(err, "Error getting unstructured object") + return false, nil // continue polling + } + + // NOTE: this assumes that the target object has a status.readyReplicas field + if readyReplicas, ok := unstructuredObj.Object["status"].(map[string]any)["readyReplicas"].(int64); ok { + if numReplicas == int32(readyReplicas) { + logger.V(logutil.DEBUG).Info("Candidate pods are READY") + return true, nil + } + logger.V(logutil.DEBUG).Info("Candidate pods are NOT READY") + } else { + logger.V(logutil.DEBUG).Info("Object status.readyReplicas field is not set yet - candidate pods for serving the request are NOT READY ") + return false, nil + } + + return false, nil + }) + + return err == nil +} + +func (a *Activator) scaleInferencePool(ctx context.Context, logger logr.Logger, namespace string, objData scaledObjectData, gr schema.GroupResource, gvr schema.GroupVersionResource) bool { + a.beginScalingUp() + defer a.endScalingUp() + + // Modify the desired replicas + objData.scaleObject.Spec.Replicas = objData.numReplicas + + // Update the Scale object + _, err := a.ScaleClient.Scales(namespace).Update(ctx, gr, objData.scaleObject, metav1.UpdateOptions{}) + if err != nil { + logger.Error(err, "Error increasing Scale Object number of replicas to one") + return false + } + logger.Info(fmt.Sprintf("Scale Object %s in namespace %s scaled up to %d replicas with scale grace period %d \n", objData.name, namespace, objData.numReplicas, int(objData.scaleGracePeriod))) + + // Wait for the pods to be ready + ready := a.inferencePoolPodsReady(logger, namespace, objData.name, objData.numReplicas, objData.scaleGracePeriod, gvr) + if ready { + // Give some time for the Endpoint Picker to pick up the newly created pods + time.Sleep(ScaleToZeroRequestRetentionPeriod) + return true + } + return false +} + +func initScaleClient(config *rest.Config) (scale.ScalesGetter, meta.RESTMapper, error) { + clientset, err := discovery.NewDiscoveryClientForConfig(config) + if err != nil { + return nil, nil, err + } + + cachedDiscoveryClient := cached.NewMemCacheClient(clientset) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient) + + return scale.New( + clientset.RESTClient(), restMapper, + dynamic.LegacyAPIPathResolverFunc, + scale.NewDiscoveryScaleKindResolver(clientset), + ), restMapper, nil +} + +func verifyPoolObjectAnnotations(logger logr.Logger, pool *v1.InferencePool) bool { + if _, ok := pool.Annotations[objectAPIVersionKey]; !ok { + logger.Info(fmt.Sprintf("Annotation '%s' not found on pool '%s'", objectAPIVersionKey, pool.Name)) + return false + } + if _, ok := pool.Annotations[objectkindKey]; !ok { + logger.Info(fmt.Sprintf("Annotation '%s' not found on pool '%s'", objectkindKey, pool.Name)) + return false + } + if _, ok := pool.Annotations[objectNameKey]; !ok { + logger.Info(fmt.Sprintf("Annotation '%s' not found on pool '%s'", objectNameKey, pool.Name)) + return false + } + return true +} + +func getOptionalPoolAnnotation(logger logr.Logger, annotationKey string, pool *v1.InferencePool) (string, bool) { + if value, ok := pool.Annotations[annotationKey]; ok { + return value, true + } + logger.V(logutil.DEBUG).Info(fmt.Sprintf("Annotation '%s' not found on pool '%s'", annotationKey, pool.Name)) + return "", false +} + +func (a *Activator) beginScalingUp() { + a.scalingUpAndGuardMu.Lock() + defer a.scalingUpAndGuardMu.Unlock() + + a.scalingUp = true + a.guard = make(chan struct{}) +} + +func (a *Activator) endScalingUp() { + a.scalingUpAndGuardMu.Lock() + defer a.scalingUpAndGuardMu.Unlock() + + a.scalingUp = false + close(a.guard) + a.guard = nil +} + +func (a *Activator) isScalingUp() (bool, chan struct{}) { + a.scalingUpAndGuardMu.Lock() + defer a.scalingUpAndGuardMu.Unlock() + + return a.scalingUp, a.guard +} + +// waitOnGuard blocks until the InferencePool is marked as ready or the timeout is reached. +func (a *Activator) waitOnGuard(guard <-chan struct{}, timeout time.Duration) { + select { + case <-time.After(timeout): + return + case <-guard: + } +} diff --git a/pkg/activator/requestcontrol/deactivator.go b/pkg/activator/requestcontrol/deactivator.go new file mode 100644 index 00000000..db18d2d2 --- /dev/null +++ b/pkg/activator/requestcontrol/deactivator.go @@ -0,0 +1,108 @@ +package requestcontrol + +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" + + "k8s.io/client-go/scale" +) + +const ( + scaleDownDelayKey = "activator.llm-d.ai/scale-down-delay" // Optional annotation +) + +// Deactivator handles scaling down InferencePools to zero replicas after a period of idleness. +type Deactivator struct { + DynamicClient *dynamic.DynamicClient + ScaleClient scale.ScalesGetter + Mapper meta.RESTMapper + datastore *datastore.Datastore +} + +// DeactivatorWithConfig creates a new Deactivator with the provided REST config and Datastore. +func DeactivatorWithConfig(config *rest.Config, datastore *datastore.Datastore) (*Deactivator, error) { + scaleClient, mapper, err := initScaleClient(config) + if err != nil { + return nil, err + } + + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + return nil, err + } + + return &Deactivator{ + datastore: datastore, + DynamicClient: dynamicClient, + Mapper: mapper, + ScaleClient: scaleClient}, nil +} + +// MonitorInferencePoolIdleness monitors the InferencePool for idleness and scales it down to zero replicas +func (da *Deactivator) MonitorInferencePoolIdleness(ctx context.Context) { + logger := log.FromContext(ctx) + ds := *(da.datastore) + + ds.ResetTicker(DefaultScaleDownDelay) + defer ds.StopTicker() + + ticker := ds.GetTicker() + + for { + select { + case <-ctx.Done(): + logger.Info("Context cancelled, stopping deactivator") + return + case <-ticker.C: + logger.V(logutil.DEBUG).Info("Deactivator Time check for inferencePool idleness: " + time.Now().Format("15:04:05")) + + // Get InferencePool Info + pool, err := ds.PoolGet() + if err != nil { + logger.V(logutil.TRACE).Info("InferencePool found", "name", pool.Name, "namespace", pool.Namespace) + continue + } + + // Verify required inferencePool annotations + valid := verifyPoolObjectAnnotations(logger, pool) + if !valid { + logger.V(logutil.TRACE).Info("InferencePool missing required annotations for pool", "name", pool.Name, "namespace", pool.Namespace) + continue + } + + gvr, err := GetResourceForKind(da.Mapper, pool.Annotations[objectAPIVersionKey], pool.Annotations[objectkindKey]) + if err != nil { + logger.Error(err, "Failed to parse Group, Version, Kind, Resource", "apiVersion", pool.Annotations[objectAPIVersionKey], "kind", pool.Annotations[objectkindKey]) + continue + } + + gr := gvr.GroupResource() + + scaleObject, err := da.ScaleClient.Scales(pool.Namespace).Get(ctx, gr, pool.Annotations[objectNameKey], metav1.GetOptions{}) + if err != nil { + logger.Error(err, "Error getting scale subresource object") + continue + } + + // Scale inferencePool to zero replicas + scaleObject.Spec.Replicas = 0 + _, err = da.ScaleClient.Scales(pool.Namespace).Update(ctx, gr, scaleObject, metav1.UpdateOptions{}) + if err != nil { + logger.Error(err, "InferencePool was not successfully scale down to zero replica") + continue + } + + logger.V(logutil.DEBUG).Info(fmt.Sprintf("InferencePool '%s' was successfully scale down to zero replica", pool.Name)) + } + } +} diff --git a/pkg/activator/requestcontrol/gvr.go b/pkg/activator/requestcontrol/gvr.go new file mode 100644 index 00000000..7f8aa131 --- /dev/null +++ b/pkg/activator/requestcontrol/gvr.go @@ -0,0 +1,39 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package requestcontrol + +import ( + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// GetResourceForKind returns GroupVersionResource for specified apiVersion (groupVersion) and Kind +func GetResourceForKind(mapper meta.RESTMapper, apiVersion string, kind string) (schema.GroupVersionResource, error) { + gv, err := schema.ParseGroupVersion(apiVersion) + if err != nil { + return schema.GroupVersionResource{}, err + } + + // Get the REST mapping for the GroupKind + gk := schema.GroupKind{Group: gv.Group, Kind: kind} + mapping, err := mapper.RESTMapping(gk, gv.Version) + if err != nil { + return schema.GroupVersionResource{}, err + } + + return mapping.Resource, nil +} diff --git a/pkg/activator/requestcontrol/types.go b/pkg/activator/requestcontrol/types.go new file mode 100644 index 00000000..efe48d04 --- /dev/null +++ b/pkg/activator/requestcontrol/types.go @@ -0,0 +1,31 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package requestcontrol + +// Response contains information from the response received to be passed to PostResponse plugins +type Response struct { + // RequestID is the Envoy generated Id for the request being processed + RequestID string + // Headers is a map of the response headers. Nil during body processing + Headers map[string]string + // Body Is the body of the response or nil during header processing + Body string + // IsStreaming indicates whether or not the response is being streamed by the model + IsStreaming bool + // EndOfStream when true indicates that this invocation contains the last chunk of the response + EndOfStream bool +} diff --git a/pkg/activator/runnable/grpc.go b/pkg/activator/runnable/grpc.go new file mode 100644 index 00000000..888c0539 --- /dev/null +++ b/pkg/activator/runnable/grpc.go @@ -0,0 +1,67 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package runnable provides utilities to convert gRPC servers into controller-runtime Runnables. +package runnable + +import ( + "context" + "fmt" + "net" + + "google.golang.org/grpc" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +// GRPCServer converts the given gRPC server into a runnable. +// The server name is just being used for logging. +func GRPCServer(name string, srv *grpc.Server, port int) manager.Runnable { + return manager.RunnableFunc(func(ctx context.Context) error { + // Use "name" key as that is what manager.Server does as well. + log := ctrl.Log.WithValues("name", name) + log.Info("gRPC server starting") + + // Start listening. + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return fmt.Errorf("gRPC server failed to listen - %w", err) + } + + log.Info("gRPC server listening", "port", port) + + // Shutdown on context closed. + // Terminate the server on context closed. + // Make sure the goroutine does not leak. + doneCh := make(chan struct{}) + defer close(doneCh) + go func() { + select { + case <-ctx.Done(): + log.Info("gRPC server shutting down") + srv.GracefulStop() + case <-doneCh: + } + }() + + // Keep serving until terminated. + if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped { + return fmt.Errorf("gRPC server failed - %w", err) + } + log.Info("gRPC server terminated") + return nil + }) +} diff --git a/pkg/activator/runnable/leader_election.go b/pkg/activator/runnable/leader_election.go new file mode 100644 index 00000000..3e2230d9 --- /dev/null +++ b/pkg/activator/runnable/leader_election.go @@ -0,0 +1,31 @@ +package runnable + +import "sigs.k8s.io/controller-runtime/pkg/manager" + +type leaderElection struct { + manager.Runnable + needsLeaderElection bool +} + +// LeaderElection wraps the given runnable to implement manager.LeaderElectionRunnable. +func LeaderElection(runnable manager.Runnable, needsLeaderElection bool) manager.Runnable { + return &leaderElection{ + Runnable: runnable, + needsLeaderElection: needsLeaderElection, + } +} + +// RequireLeaderElection wraps the given runnable, marking it as requiring leader election. +func RequireLeaderElection(runnable manager.Runnable) manager.Runnable { + return LeaderElection(runnable, true) +} + +// NoLeaderElection wraps the given runnable, marking it as not requiring leader election. +func NoLeaderElection(runnable manager.Runnable) manager.Runnable { + return LeaderElection(runnable, false) +} + +// NeedLeaderElection implements manager.NeedLeaderElection interface. +func (r *leaderElection) NeedLeaderElection() bool { + return r.needsLeaderElection +} diff --git a/pkg/activator/server/controller_manager.go b/pkg/activator/server/controller_manager.go new file mode 100644 index 00000000..b0610291 --- /dev/null +++ b/pkg/activator/server/controller_manager.go @@ -0,0 +1,119 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package server provides utilities to create and manage the controller manager for the activator component. +package server + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" +) + +var scheme = runtime.NewScheme() + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(v1alpha2.Install(scheme)) + utilruntime.Must(v1.Install(scheme)) +} + +// defaultManagerOptions returns the default options used to create the manager. +func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver.Options) (ctrl.Options, error) { + opt := ctrl.Options{ + Scheme: scheme, + Cache: cache.Options{ + ByObject: map[client.Object]cache.ByObject{ + &corev1.Pod{}: { + Namespaces: map[string]cache.Config{ + gknn.Namespace: {}, + }, + }, + &v1alpha2.InferenceObjective{}: { + Namespaces: map[string]cache.Config{ + gknn.Namespace: {}, + }, + }, + }, + }, + Metrics: metricsServerOptions, + } + switch gknn.Group { + case v1alpha2.GroupName: + opt.Cache.ByObject[&v1alpha2.InferencePool{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": gknn.Name, + })}}, + } + case v1.GroupName: + opt.Cache.ByObject[&v1.InferencePool{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": gknn.Name, + })}}, + } + default: + return ctrl.Options{}, fmt.Errorf("unknown group: %s", gknn.Group) + } + + return opt, nil +} + +// NewDefaultManager creates a new controller manager with default configuration. +func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) { + opt, err := defaultManagerOptions(gknn, metricsServerOptions) + if err != nil { + return nil, fmt.Errorf("failed to create controller manager options: %v", err) + } + + if leaderElectionEnabled { + opt.LeaderElection = true + opt.LeaderElectionResourceLock = "leases" + // The lease name needs to be unique per EPP deployment. + opt.LeaderElectionID = fmt.Sprintf("epp-%s-%s.gateway-api-inference-extension.sigs.k8s.io", gknn.Namespace, gknn.Name) + opt.LeaderElectionNamespace = gknn.Namespace + opt.LeaderElectionReleaseOnCancel = true + } + + manager, err := ctrl.NewManager(restConfig, opt) + + if err != nil { + return nil, fmt.Errorf("failed to create controller manager: %v", err) + } + return manager, nil +} + +// NewManagerWithOptions creates a new controller manager with injectable options. +func NewManagerWithOptions(restConfig *rest.Config, opts manager.Options) (ctrl.Manager, error) { + manager, err := ctrl.NewManager(restConfig, opts) + if err != nil { + return nil, fmt.Errorf("failed to create controller manager: %v", err) + } + return manager, nil +} diff --git a/pkg/activator/server/runserver.go b/pkg/activator/server/runserver.go new file mode 100644 index 00000000..ba0b9b85 --- /dev/null +++ b/pkg/activator/server/runserver.go @@ -0,0 +1,156 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "context" + "crypto/tls" + "fmt" + "time" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/go-logr/logr" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + healthgrpc "google.golang.org/grpc/health/grpc_health_v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/controller" + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/handlers" + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/requestcontrol" + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/runnable" + tlsutil "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/tls" +) + +// ExtProcServerRunner provides methods to manage an external process server. +type ExtProcServerRunner struct { + GrpcPort int + PoolNamespacedName types.NamespacedName + PoolGKNN common.GKNN + Datastore datastore.Datastore + SecureServing bool + HealthChecking bool + CertPath string + RefreshPrometheusMetricsInterval time.Duration + MetricsStalenessThreshold time.Duration + Activator *requestcontrol.Activator +} + +// Default values for CLI flags in main +const ( + DefaultGrpcPort = 9002 // default for --grpc-port + DefaultGrpcHealthPort = 9003 // default for --grpc-health-port + DefaultMetricsPort = 9090 // default for --metrics-port + DefaultPoolName = "" // required but no default + DefaultPoolNamespace = "default" // default for --pool-namespace + DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refresh-metrics-interval + DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refresh-prometheus-metrics-interval + DefaultSecureServing = true // default for --secure-serving + DefaultHealthChecking = false // default for --health-checking + DefaultEnablePprof = true // default for --enable-pprof + DefaultCertPath = "" // default for --cert-path + DefaultPoolGroup = "inference.networking.k8s.io" // default for --pool-group + DefaultMetricsStalenessThreshold = 2 * time.Second +) + +// NewDefaultExtProcServerRunner creates a runner with default values. +// Note: Dependencies like Datastore, Scheduler, SD need to be set separately. +func NewDefaultExtProcServerRunner() *ExtProcServerRunner { + poolGKNN := common.GKNN{ + NamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace}, + GroupKind: schema.GroupKind{ + Group: DefaultPoolGroup, + Kind: "InferencePool", + }, + } + return &ExtProcServerRunner{ + GrpcPort: DefaultGrpcPort, + PoolNamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace}, + PoolGKNN: poolGKNN, + SecureServing: DefaultSecureServing, + HealthChecking: DefaultHealthChecking, + RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, + MetricsStalenessThreshold: DefaultMetricsStalenessThreshold, + // Dependencies can be assigned later. + } +} + +// SetupWithManager sets up the runner with the given manager. +func (r *ExtProcServerRunner) SetupWithManager(_ context.Context, mgr ctrl.Manager) error { + // Create the controllers and register them with the manager + if err := (&controller.InferencePoolReconciler{ + Datastore: r.Datastore, + Reader: mgr.GetClient(), + PoolGKNN: r.PoolGKNN, + }).SetupWithManager(mgr); err != nil { + return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err) + } + + return nil +} + +// AsRunnable returns a Runnable that can be used to start the ext-proc gRPC server. +// The runnable implements LeaderElectionRunnable with leader election disabled. +func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable { + return runnable.NoLeaderElection(manager.RunnableFunc(func(ctx context.Context) error { + + var srv *grpc.Server + if r.SecureServing { + var cert tls.Certificate + var err error + if r.CertPath != "" { + cert, err = tls.LoadX509KeyPair(r.CertPath+"/tls.crt", r.CertPath+"/tls.key") + } else { + // Create tls based credential. + cert, err = tlsutil.CreateSelfSignedTLSCertificate(logger) + } + if err != nil { + return fmt.Errorf("failed to create self signed certificate - %w", err) + } + + creds := credentials.NewTLS(&tls.Config{ + Certificates: []tls.Certificate{cert}, + }) + // Init the server. + srv = grpc.NewServer(grpc.Creds(creds)) + } else { + srv = grpc.NewServer() + } + + extProcServer := handlers.NewStreamingServer(r.Datastore, r.Activator) + extProcPb.RegisterExternalProcessorServer(srv, extProcServer) + + if r.HealthChecking { + healthcheck := health.NewServer() + healthgrpc.RegisterHealthServer(srv, + healthcheck, + ) + svcName := extProcPb.ExternalProcessor_ServiceDesc.ServiceName + logger.Info("Setting ExternalProcessor service status to SERVING", "serviceName", svcName) + healthcheck.SetServingStatus(svcName, healthgrpc.HealthCheckResponse_SERVING) + } + + // Forward to the gRPC runnable. + return runnable.GRPCServer("ext-proc", srv, r.GrpcPort).Start(ctx) + })) +} diff --git a/pkg/activator/tls/tls.go b/pkg/activator/tls/tls.go new file mode 100644 index 00000000..ba9d20b9 --- /dev/null +++ b/pkg/activator/tls/tls.go @@ -0,0 +1,74 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package tls provides utilities for TLS configuration. +package tls + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "fmt" + "math/big" + "time" + + "github.com/go-logr/logr" +) + +// CreateSelfSignedTLSCertificate creates a self-signed cert the server can use to serve TLS. +func CreateSelfSignedTLSCertificate(_ logr.Logger) (tls.Certificate, error) { + serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) + serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) + if err != nil { + return tls.Certificate{}, fmt.Errorf("error creating serial number: %v", err) + } + now := time.Now() + notBefore := now.UTC() + template := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + Organization: []string{"Inference Ext"}, + }, + NotBefore: notBefore, + NotAfter: now.Add(time.Hour * 24 * 365 * 10).UTC(), // 10 years + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + } + + priv, err := rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + return tls.Certificate{}, fmt.Errorf("error generating key: %v", err) + } + + derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv) + if err != nil { + return tls.Certificate{}, fmt.Errorf("error creating certificate: %v", err) + } + + certBytes := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + + privBytes, err := x509.MarshalPKCS8PrivateKey(priv) + if err != nil { + return tls.Certificate{}, fmt.Errorf("error marshalling private key: %v", err) + } + keyBytes := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes}) + + return tls.X509KeyPair(certBytes, keyBytes) +} From 818c096f6d96aa7c1348715fb37d79154ef85366 Mon Sep 17 00:00:00 2001 From: Braulio Dumba Date: Thu, 30 Oct 2025 11:57:18 -0400 Subject: [PATCH 02/13] Adding for InferencePool v1 Signed-off-by: Braulio Dumba --- charts/activator-filter/templates/rbac.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/charts/activator-filter/templates/rbac.yaml b/charts/activator-filter/templates/rbac.yaml index f6afa52f..13c21941 100644 --- a/charts/activator-filter/templates/rbac.yaml +++ b/charts/activator-filter/templates/rbac.yaml @@ -12,6 +12,7 @@ metadata: rules: # TODO: These can probably be trimmed down - apiGroups: - "inference.networking.x-k8s.io" + - "inference.networking.k8s.io" resources: - "inferencepools" verbs: From 25cd97461553152d4bf1b7575cf3ee916bedba30 Mon Sep 17 00:00:00 2001 From: Braulio Dumba Date: Thu, 30 Oct 2025 14:56:39 -0400 Subject: [PATCH 03/13] Adding parameter to enable/disable scale to zero Signed-off-by: Braulio Dumba --- cmd/activator/runner/runner.go | 37 ++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/cmd/activator/runner/runner.go b/cmd/activator/runner/runner.go index e31e7898..3a9a7453 100644 --- a/cmd/activator/runner/runner.go +++ b/cmd/activator/runner/runner.go @@ -47,16 +47,17 @@ import ( ) var ( - grpcPort = flag.Int("grpc-port", runserver.DefaultGrpcPort, "The gRPC port used for communicating with Envoy proxy") - grpcHealthPort = flag.Int("grpc-health-port", runserver.DefaultGrpcHealthPort, "The port used for gRPC liveness and readiness probes") - metricsPort = flag.Int("metrics-port", runserver.DefaultMetricsPort, "The metrics port") - poolName = flag.String("pool-name", runserver.DefaultPoolName, "Name of the InferencePool this Endpoint Picker is associated with.") - poolGroup = flag.String("pool-group", runserver.DefaultPoolGroup, "group of the InferencePool this Endpoint Picker is associated with.") - poolNamespace = flag.String("pool-namespace", "", "Namespace of the InferencePool this Endpoint Picker is associated with.") - logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity") - secureServing = flag.Bool("secure-serving", runserver.DefaultSecureServing, "Enables secure serving. Defaults to true.") - healthChecking = flag.Bool("health-checking", runserver.DefaultHealthChecking, "Enables health checking") - certPath = flag.String("cert-path", runserver.DefaultCertPath, "The path to the certificate for secure serving. The certificate and private key files "+ + grpcPort = flag.Int("grpc-port", runserver.DefaultGrpcPort, "The gRPC port used for communicating with Envoy proxy") + grpcHealthPort = flag.Int("grpc-health-port", runserver.DefaultGrpcHealthPort, "The port used for gRPC liveness and readiness probes") + metricsPort = flag.Int("metrics-port", runserver.DefaultMetricsPort, "The metrics port") + poolName = flag.String("pool-name", runserver.DefaultPoolName, "Name of the InferencePool this Endpoint Picker is associated with.") + poolGroup = flag.String("pool-group", runserver.DefaultPoolGroup, "group of the InferencePool this Endpoint Picker is associated with.") + poolNamespace = flag.String("pool-namespace", "", "Namespace of the InferencePool this Endpoint Picker is associated with.") + enableScaleToZero = flag.Bool("enable-scale-to-zero", false, "Enable scaling down InferencePool to zero replicas after a period of idleness") + logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity") + secureServing = flag.Bool("secure-serving", runserver.DefaultSecureServing, "Enables secure serving. Defaults to true.") + healthChecking = flag.Bool("health-checking", runserver.DefaultHealthChecking, "Enables health checking") + certPath = flag.String("cert-path", runserver.DefaultCertPath, "The path to the certificate for secure serving. The certificate and private key files "+ "are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+ "then a self-signed certificate is used.") haEnableLeaderElection = flag.Bool("ha-enable-leader-election", false, "Enables leader election for high availability. When enabled, readiness probes will only pass on the leader.") @@ -107,14 +108,16 @@ func Run(ctx context.Context) error { } // --- Setup Deactivator --- - deactivator, err := requestcontrol.DeactivatorWithConfig(cfg, &datastore) - if err != nil { - setupLog.Error(err, "Failed to setup Deactivator") - return err - } + if *enableScaleToZero { + deactivator, err := requestcontrol.DeactivatorWithConfig(cfg, &datastore) + if err != nil { + setupLog.Error(err, "Failed to setup Deactivator") + return err + } - // Start Deactivator - go deactivator.MonitorInferencePoolIdleness(ctx) + // Start Deactivator + go deactivator.MonitorInferencePoolIdleness(ctx) + } // --- Setup Metrics Server --- metricsServerOptions := metricsserver.Options{ From 16c4bddd4766a919474ca45b669584ac0473b98e Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 3 Nov 2025 10:05:50 -0500 Subject: [PATCH 04/13] Update pkg/activator/server/controller_manager.go Co-authored-by: Pierangelo Di Pilato Signed-off-by: Lionel Villard --- pkg/activator/server/controller_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/activator/server/controller_manager.go b/pkg/activator/server/controller_manager.go index b0610291..713ca8f7 100644 --- a/pkg/activator/server/controller_manager.go +++ b/pkg/activator/server/controller_manager.go @@ -95,8 +95,8 @@ func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerO if leaderElectionEnabled { opt.LeaderElection = true opt.LeaderElectionResourceLock = "leases" - // The lease name needs to be unique per EPP deployment. - opt.LeaderElectionID = fmt.Sprintf("epp-%s-%s.gateway-api-inference-extension.sigs.k8s.io", gknn.Namespace, gknn.Name) + // The lease name needs to be unique per activator deployment. + opt.LeaderElectionID = fmt.Sprintf("activator-%s-%s.llm-d.ai", gknn.Namespace, gknn.Name) opt.LeaderElectionNamespace = gknn.Namespace opt.LeaderElectionReleaseOnCancel = true } From 0b278ad969f00effecb5d9a3b22182a286e22ba9 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 3 Nov 2025 11:21:20 -0500 Subject: [PATCH 05/13] address some review comments Signed-off-by: Lionel Villard --- Makefile | 42 +++++++---------------- charts/activator/README.md | 2 +- charts/activator/values.yaml | 2 +- pkg/activator/requestcontrol/activator.go | 9 +++-- 4 files changed, 20 insertions(+), 35 deletions(-) diff --git a/Makefile b/Makefile index 0eae2396..e7a5f475 100644 --- a/Makefile +++ b/Makefile @@ -22,8 +22,11 @@ NAMESPACE ?= hc4ai-operator VLLM_SIMULATOR_TAG ?= v0.5.0 export VLLM_SIMULATOR_TAG -ACTIVATOR_IMAGE_TAG_BASE ?= $(IMAGE_REGISTRY)/$(PROJECT_NAME)-activator -ACTIVATOR_IMG = $(ACTIVATOR_IMAGE_TAG_BASE):$(EPP_TAG) +ACTIVATOR_IMAGE_NAME ?= llm-d-activator +ACTIVATOR_NAME ?= activator +ACTIVATOR_TAG ?= dev +ACTIVATOR_IMAGE_TAG_BASE ?= $(IMAGE_REGISTRY)/$(ACTIVATOR_IMAGE_NAME) +ACTIVATOR_IMG = $(ACTIVATOR_IMAGE_TAG_BASE):$(ACTIVATOR_TAG) # Map go arch to typos arch ifeq ($(TARGETARCH),amd64) @@ -62,10 +65,13 @@ SRC = $(shell find . -type f -name '*.go') # Internal variables for generic targets epp_IMAGE = $(IMG) sidecar_IMAGE = $(SIDECAR_IMG) +activator_IMAGE = $(ACTIVATOR_IMG) epp_NAME = epp sidecar_NAME = $(SIDECAR_NAME) +activator_NAME = $(ACTIVATOR_NAME) epp_LDFLAGS = -ldflags="$(LDFLAGS)" sidecar_LDFLAGS = +activator_LDFLAGS = -ldflags="$(LDFLAGS)" epp_TEST_FILES = go list ./... | grep -v /test/ | grep -v ./pkg/sidecar/ sidecar_TEST_FILES = go list ./pkg/sidecar/... @@ -138,25 +144,17 @@ lint: check-golangci-lint check-typos ## Run lint ##@ Build .PHONY: build -build: build-epp build-sidecar ## Build the project +build: build-epp build-sidecar build-activator ## Build the project .PHONY: build-% build-%: check-go install-dependencies download-tokenizer ## Build the project @printf "\033[33;1m==== Building ====\033[0m\n" go build $($*_LDFLAGS) -o bin/$($*_NAME) cmd/$($*_NAME)/main.go -##@ Build Activator - -.PHONY: activator-build -activator-build: check-go install-dependencies download-tokenizer ## Build the project - @printf "\033[33;1m==== Building ====\033[0m\n" - go build -ldflags="$(LDFLAGS)" -o bin/activator cmd/activator/main.go - - ##@ Container Build/Push .PHONY: image-build -image-build: image-build-epp image-build-sidecar ## Build Docker image +image-build: image-build-epp image-build-sidecar image-build-activator ## Build Docker image .PHONY: image-build-% image-build-%: check-container-tool ## Build Docker image ## Build Docker image using $(CONTAINER_RUNTIME) @@ -170,7 +168,7 @@ image-build-%: check-container-tool ## Build Docker image ## Build Docker image -t $($*_IMAGE) -f Dockerfile.$* . .PHONY: image-push -image-push: image-push-epp image-push-sidecar ## Push container images to registry +image-push: image-push-epp image-push-sidecar image-push-activator ## Push container images to registry .PHONY: image-push-% image-push-%: check-container-tool ## Push container image to registry @@ -182,22 +180,6 @@ image-pull: check-container-tool ## Pull all related images using $(CONTAINER_RU @printf "\033[33;1m==== Pulling Container images ====\033[0m\n" ./scripts/pull_images.sh -.PHONY: activator-image-build -activator-image-build: ## Build the activator image using Docker Buildx. - $(CONTAINER_TOOL) build \ - --platform linux/$(TARGETARCH) \ - --build-arg TARGETOS=linux \ - --build-arg TARGETARCH=${TARGETARCH} \ - --build-arg COMMIT_SHA=${GIT_COMMIT_SHA} \ - --build-arg BUILD_REF=${BUILD_REF} \ - -t $(ACTIVATOR_IMG) \ - -f Dockerfile.activator . - -.PHONY: activator-image-push -activator-image-push: check-container-tool load-version-json ## Push Activator Docker image $(ACTIVATOR_IMG) to registry - @printf "\033[33;1m==== Pushing Activator Docker image $(ACTIVATOR_IMG) ====\033[0m\n" - $(CONTAINER_TOOL) push $(ACTIVATOR_IMG) - ##@ Install/Uninstall Targets # Default install/uninstall (Docker) @@ -363,7 +345,7 @@ check-container-tool: else \ echo "✅ Container tool '$(CONTAINER_RUNTIME)' found."; \ fi - + .PHONY: check-kubectl check-kubectl: diff --git a/charts/activator/README.md b/charts/activator/README.md index 4f244735..cb9febfd 100644 --- a/charts/activator/README.md +++ b/charts/activator/README.md @@ -36,7 +36,7 @@ The following table list the configurable parameters of the chart. | `activator.image.pullPolicy` | Image pull policy for the container. Possible values: `Always`, `IfNotPresent`, or `Never`. Defaults to `Always`. | | `inferenceGateway.port` | The port of the Gateway. Defaults to `80`. | | `inferencePool.name` | The name of the InferencePool to target. | -| `inferencePool.apiVersion` | The API version of the InferencePool. Defaults to `inference.networking.x-k8s.io`. | +| `inferencePool.apiVersion` | The API version of the InferencePool. Defaults to `inference.networking.k8s.io`. | | `route.name` | The name of the HTTPRoute to attach the activator to. | ## Notes diff --git a/charts/activator/values.yaml b/charts/activator/values.yaml index eee5d70d..954cbc48 100644 --- a/charts/activator/values.yaml +++ b/charts/activator/values.yaml @@ -13,7 +13,7 @@ route: inferencePool: name: inference-pool - group: inference.networking.x-k8s.io + group: inference.networking.k8s.io inferenceGateway: port: 80 diff --git a/pkg/activator/requestcontrol/activator.go b/pkg/activator/requestcontrol/activator.go index 0cb23f4a..5e21614d 100644 --- a/pkg/activator/requestcontrol/activator.go +++ b/pkg/activator/requestcontrol/activator.go @@ -10,6 +10,7 @@ import ( "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" @@ -179,14 +180,16 @@ func (a *Activator) inferencePoolPodsReady(logger logr.Logger, namespace, objnam } // NOTE: this assumes that the target object has a status.readyReplicas field - if readyReplicas, ok := unstructuredObj.Object["status"].(map[string]any)["readyReplicas"].(int64); ok { - if numReplicas == int32(readyReplicas) { + if readyReplicas, found, err := unstructured.NestedFieldNoCopy(unstructuredObj.Object, "status", "readyReplicas"); found { + if numReplicas == int32(readyReplicas.(int64)) { logger.V(logutil.DEBUG).Info("Candidate pods are READY") return true, nil } logger.V(logutil.DEBUG).Info("Candidate pods are NOT READY") + } else if err != nil { + logger.Error(err, "Error getting readyReplicas - candidate pods for serving the request are NOT READY") } else { - logger.V(logutil.DEBUG).Info("Object status.readyReplicas field is not set yet - candidate pods for serving the request are NOT READY ") + logger.V(logutil.DEBUG).Info("Object status.readyReplicas field is not set yet - candidate pods for serving the request are NOT READY") return false, nil } From 97128653498d04b420ec62847bd6dcb9640fe766 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Tue, 4 Nov 2025 13:58:11 -0500 Subject: [PATCH 06/13] address review comments Signed-off-by: Lionel Villard --- Dockerfile.activator | 52 +++++++++++++-------- Dockerfile.epp | 1 - cmd/activator/runner/runner.go | 4 +- pkg/activator/datastore/datastore.go | 23 ++++----- pkg/activator/requestcontrol/deactivator.go | 14 +++--- 5 files changed, 53 insertions(+), 41 deletions(-) diff --git a/Dockerfile.activator b/Dockerfile.activator index 6c0e2ebd..4bd6496a 100644 --- a/Dockerfile.activator +++ b/Dockerfile.activator @@ -1,28 +1,42 @@ -## Multistage build +# Build Stage: using Go 1.25 image FROM quay.io/projectquay/golang:1.25 AS builder -ARG COMMIT_SHA=unknown -ARG BUILD_REF -ARG TARGETOS=linux -ARG TARGETARCH=amd64 -ENV CGO_ENABLED=0 -ENV GOOS=$TARGETOS -ENV GOARCH=$TARGETARCH +ARG TARGETOS +ARG TARGETARCH -# Dependencies -WORKDIR /src -COPY go.mod go.sum ./ -RUN go mod download +WORKDIR /workspace -# Sources +# Copy the Go Modules manifests +COPY go.mod go.mod +COPY go.sum go.sum + +# Copy the go source COPY cmd/activator ./cmd/activator COPY pkg/activator ./pkg/activator -WORKDIR /src/cmd/activator -RUN go build -ldflags="-X sigs.k8s.io/gateway-api-inference-extension/version.CommitSHA=${COMMIT_SHA} -X sigs.k8s.io/gateway-api-inference-extension/version.BuildRef=${BUILD_REF}" -o /activator -## Multistage deploy -FROM registry.access.redhat.com/ubi9/ubi-minimal:latest +# Build +# the GOARCH has not a default value to allow the binary be built according to the host where the command +# was called. For example, if we call make image-build in a local env which has the Apple Silicon M1 SO +# the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore, +# by leaving it empty we can ensure that the container and binary shipped on it will have the same platform. +ENV CGO_ENABLED=0 +ENV GOOS=${TARGETOS:-linux} +ENV GOARCH=${TARGETARCH} +ARG COMMIT_SHA=unknown +ARG BUILD_REF +RUN go build -a -o bin/activator -ldflags="-X sigs.k8s.io/gateway-api-inference-extension/version.CommitSHA=${COMMIT_SHA} -X sigs.k8s.io/gateway-api-inference-extension/version.BuildRef=${BUILD_REF}" cmd/activator/main.go + +# Use ubi9 as a minimal base image to package the manager binary +# Refer to https://catalog.redhat.com/software/containers/ubi9/ubi-minimal/615bd9b4075b022acc111bf5 for more details +FROM registry.access.redhat.com/ubi9/ubi-minimal:latest WORKDIR / -COPY --from=builder /activator /activator +COPY --from=builder /workspace/bin/activator /app/activator + +# expose gRPC, health and metrics ports +EXPOSE 9002 +EXPOSE 9003 +EXPOSE 9090 + +USER 65532:65532 -ENTRYPOINT ["/activator"] +ENTRYPOINT ["/app/activator"] diff --git a/Dockerfile.epp b/Dockerfile.epp index 38dfeca4..ef40fabf 100644 --- a/Dockerfile.epp +++ b/Dockerfile.epp @@ -105,4 +105,3 @@ EXPOSE 9090 EXPOSE 5557 ENTRYPOINT ["/app/epp"] - diff --git a/cmd/activator/runner/runner.go b/cmd/activator/runner/runner.go index 3a9a7453..3802d95d 100644 --- a/cmd/activator/runner/runner.go +++ b/cmd/activator/runner/runner.go @@ -75,7 +75,7 @@ func Run(ctx context.Context) error { flag.Parse() initLogging(&opts) - setupLog.Info("GIE build", "commit-sha", version.CommitSHA, "build-ref", version.BuildRef) + setupLog.Info("Activator build", "commit-sha", version.CommitSHA, "build-ref", version.BuildRef) // Validate flags if err := validateFlags(); err != nil { @@ -109,7 +109,7 @@ func Run(ctx context.Context) error { // --- Setup Deactivator --- if *enableScaleToZero { - deactivator, err := requestcontrol.DeactivatorWithConfig(cfg, &datastore) + deactivator, err := requestcontrol.DeactivatorWithConfig(cfg, datastore) if err != nil { setupLog.Error(err, "Failed to setup Deactivator") return err diff --git a/pkg/activator/datastore/datastore.go b/pkg/activator/datastore/datastore.go index d1a8242d..cd0344aa 100644 --- a/pkg/activator/datastore/datastore.go +++ b/pkg/activator/datastore/datastore.go @@ -41,7 +41,7 @@ type Datastore interface { PoolGet() (*v1.InferencePool, error) PoolHasSynced() bool - GetTicker() *time.Ticker + GetTickerCh() <-chan time.Time ResetTicker(t time.Duration) StopTicker() @@ -52,16 +52,13 @@ type Datastore interface { // NewDatastore creates a new Datastore instance with the provided parent context. func NewDatastore(parentCtx context.Context) Datastore { store := &datastore{ - parentCtx: parentCtx, - poolMu: sync.RWMutex{}, - ticker: time.NewTicker(60 * time.Second), + poolMu: sync.RWMutex{}, + ticker: time.NewTicker(60 * time.Second), } return store } type datastore struct { - // parentCtx controls the lifecycle of the background metrics goroutines that spawn up by the datastore. - parentCtx context.Context // poolMu is used to synchronize access to pool map. poolMu sync.RWMutex pool *v1.InferencePool @@ -79,7 +76,7 @@ func (ds *datastore) PoolSet(pool *v1.InferencePool) { func (ds *datastore) PoolGet() (*v1.InferencePool, error) { ds.poolMu.RLock() defer ds.poolMu.RUnlock() - if !ds.PoolHasSynced() { + if ds.pool == nil { return nil, errPoolNotSynced } return ds.pool, nil @@ -96,19 +93,19 @@ func (ds *datastore) Clear() { } func (ds *datastore) ResetTicker(t time.Duration) { - ds.poolMu.RLock() - defer ds.poolMu.RUnlock() + ds.poolMu.Lock() + defer ds.poolMu.Unlock() ds.ticker.Reset(t) } -func (ds *datastore) GetTicker() *time.Ticker { +func (ds *datastore) GetTickerCh() <-chan time.Time { ds.poolMu.RLock() defer ds.poolMu.RUnlock() - return ds.ticker + return ds.ticker.C } func (ds *datastore) StopTicker() { - ds.poolMu.RLock() - defer ds.poolMu.RUnlock() + ds.poolMu.Lock() + defer ds.poolMu.Unlock() ds.ticker.Stop() } diff --git a/pkg/activator/requestcontrol/deactivator.go b/pkg/activator/requestcontrol/deactivator.go index db18d2d2..f270e0ef 100644 --- a/pkg/activator/requestcontrol/deactivator.go +++ b/pkg/activator/requestcontrol/deactivator.go @@ -26,11 +26,11 @@ type Deactivator struct { DynamicClient *dynamic.DynamicClient ScaleClient scale.ScalesGetter Mapper meta.RESTMapper - datastore *datastore.Datastore + datastore datastore.Datastore } // DeactivatorWithConfig creates a new Deactivator with the provided REST config and Datastore. -func DeactivatorWithConfig(config *rest.Config, datastore *datastore.Datastore) (*Deactivator, error) { +func DeactivatorWithConfig(config *rest.Config, datastore datastore.Datastore) (*Deactivator, error) { scaleClient, mapper, err := initScaleClient(config) if err != nil { return nil, err @@ -51,20 +51,22 @@ func DeactivatorWithConfig(config *rest.Config, datastore *datastore.Datastore) // MonitorInferencePoolIdleness monitors the InferencePool for idleness and scales it down to zero replicas func (da *Deactivator) MonitorInferencePoolIdleness(ctx context.Context) { logger := log.FromContext(ctx) - ds := *(da.datastore) + ds := da.datastore + // The deactivator uses a ticker to periodically try and scale down the InferencePool after idleness. + // Upon each received requests, the ticker is reset to delay the next scale-down check. ds.ResetTicker(DefaultScaleDownDelay) defer ds.StopTicker() - ticker := ds.GetTicker() + tickerCh := ds.GetTickerCh() for { select { case <-ctx.Done(): logger.Info("Context cancelled, stopping deactivator") return - case <-ticker.C: - logger.V(logutil.DEBUG).Info("Deactivator Time check for inferencePool idleness: " + time.Now().Format("15:04:05")) + case <-tickerCh: + logger.V(logutil.DEBUG).Info("Deactivator time check for inferencePool idleness: " + time.Now().Format("15:04:05")) // Get InferencePool Info pool, err := ds.PoolGet() From 58735ff25934f9c23c4a1820b5e71ea494212734 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Tue, 4 Nov 2025 14:03:17 -0500 Subject: [PATCH 07/13] fix linter error Signed-off-by: Lionel Villard --- cmd/activator/runner/runner.go | 2 +- pkg/activator/datastore/datastore.go | 3 +-- pkg/activator/datastore/datastore_test.go | 3 +-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/cmd/activator/runner/runner.go b/cmd/activator/runner/runner.go index 3802d95d..a336830c 100644 --- a/cmd/activator/runner/runner.go +++ b/cmd/activator/runner/runner.go @@ -98,7 +98,7 @@ func Run(ctx context.Context) error { } // --- Setup Datastore --- - datastore := datastore.NewDatastore(ctx) + datastore := datastore.NewDatastore() // --- Setup Activator --- activator, err := requestcontrol.NewActivatorWithConfig(cfg, datastore) diff --git a/pkg/activator/datastore/datastore.go b/pkg/activator/datastore/datastore.go index cd0344aa..4cc4a6ad 100644 --- a/pkg/activator/datastore/datastore.go +++ b/pkg/activator/datastore/datastore.go @@ -21,7 +21,6 @@ limitations under the License. package datastore import ( - "context" "errors" "sync" "time" @@ -50,7 +49,7 @@ type Datastore interface { } // NewDatastore creates a new Datastore instance with the provided parent context. -func NewDatastore(parentCtx context.Context) Datastore { +func NewDatastore() Datastore { store := &datastore{ poolMu: sync.RWMutex{}, ticker: time.NewTicker(60 * time.Second), diff --git a/pkg/activator/datastore/datastore_test.go b/pkg/activator/datastore/datastore_test.go index 76538d92..66e3cc9b 100644 --- a/pkg/activator/datastore/datastore_test.go +++ b/pkg/activator/datastore/datastore_test.go @@ -17,7 +17,6 @@ limitations under the License. package datastore import ( - "context" "testing" "github.com/google/go-cmp/cmp" @@ -65,7 +64,7 @@ func TestPool(t *testing.T) { scheme := runtime.NewScheme() _ = clientgoscheme.AddToScheme(scheme) - datastore := NewDatastore(context.Background()) + datastore := NewDatastore() datastore.PoolSet(tt.inferencePool) gotPool, gotErr := datastore.PoolGet() if diff := cmp.Diff(tt.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" { From a03a27253629300c24a790092c3d155b53ea7d63 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Wed, 5 Nov 2025 14:44:42 -0500 Subject: [PATCH 08/13] activator-filter should be applied before BBR Signed-off-by: Lionel Villard --- charts/activator-filter/README.md | 4 +++- charts/activator-filter/templates/istio.yaml | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/charts/activator-filter/README.md b/charts/activator-filter/README.md index 7e57903e..345207d2 100644 --- a/charts/activator-filter/README.md +++ b/charts/activator-filter/README.md @@ -10,6 +10,8 @@ To install an activator-filter named `activator-filter`, you can run the followi $ helm install activator-filter ./charts/activator-filter ``` +> **Note:** This chart should be deployed before the [Body Based Routing](https://github.com/kubernetes-sigs/gateway-api-inference-extension/tree/main/config/charts/body-based-routing) chart for optimal functionality. + ## Uninstall Run the following command to uninstall the chart: @@ -28,4 +30,4 @@ The following table list the configurable parameters of the chart. ## Notes -This chart should only be deployed once. +This chart should only be deployed once diff --git a/charts/activator-filter/templates/istio.yaml b/charts/activator-filter/templates/istio.yaml index 32d4f075..2d9c64a6 100644 --- a/charts/activator-filter/templates/istio.yaml +++ b/charts/activator-filter/templates/istio.yaml @@ -13,7 +13,7 @@ spec: filter: name: "envoy.filters.network.http_connection_manager" patch: - operation: INSERT_FIRST # TODO: insert before EPP + operation: INSERT_FIRST value: name: envoy.filters.http.activator.ext_proc typed_config: From f4b2b62d4aee8cf0cbcf7e2a7865b5f933e1ad57 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Thu, 6 Nov 2025 13:18:49 -0500 Subject: [PATCH 09/13] remove unused files and reorder some imports Signed-off-by: Lionel Villard --- pkg/activator/requestcontrol/activator.go | 12 ++++----- pkg/activator/requestcontrol/types.go | 31 ----------------------- 2 files changed, 6 insertions(+), 37 deletions(-) delete mode 100644 pkg/activator/requestcontrol/types.go diff --git a/pkg/activator/requestcontrol/activator.go b/pkg/activator/requestcontrol/activator.go index 5e21614d..0e75a516 100644 --- a/pkg/activator/requestcontrol/activator.go +++ b/pkg/activator/requestcontrol/activator.go @@ -8,26 +8,26 @@ import ( "time" "github.com/go-logr/logr" + + autoscaling "k8s.io/api/autoscaling/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + cached "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" + "k8s.io/client-go/scale" "sigs.k8s.io/controller-runtime/pkg/log" - - "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" - autoscaling "k8s.io/api/autoscaling/v1" - "k8s.io/client-go/discovery" - cached "k8s.io/client-go/discovery/cached" - "k8s.io/client-go/scale" + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" ) const ( diff --git a/pkg/activator/requestcontrol/types.go b/pkg/activator/requestcontrol/types.go deleted file mode 100644 index efe48d04..00000000 --- a/pkg/activator/requestcontrol/types.go +++ /dev/null @@ -1,31 +0,0 @@ -/* -Copyright 2025 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package requestcontrol - -// Response contains information from the response received to be passed to PostResponse plugins -type Response struct { - // RequestID is the Envoy generated Id for the request being processed - RequestID string - // Headers is a map of the response headers. Nil during body processing - Headers map[string]string - // Body Is the body of the response or nil during header processing - Body string - // IsStreaming indicates whether or not the response is being streamed by the model - IsStreaming bool - // EndOfStream when true indicates that this invocation contains the last chunk of the response - EndOfStream bool -} From 6c5fb88d220e887af16ad40f8240d668b6d23b7a Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 10 Nov 2025 09:40:39 -0500 Subject: [PATCH 10/13] Apply suggestion from @shmuelk Co-authored-by: Shmuel Kallner Signed-off-by: Lionel Villard --- pkg/activator/handlers/server.go | 62 +++++++++----------------------- 1 file changed, 16 insertions(+), 46 deletions(-) diff --git a/pkg/activator/handlers/server.go b/pkg/activator/handlers/server.go index 5673d5ed..3cc80cd1 100644 --- a/pkg/activator/handlers/server.go +++ b/pkg/activator/handlers/server.go @@ -132,68 +132,38 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) } func buildErrResponse(err error) (*extProcPb.ProcessingResponse, error) { - var resp *extProcPb.ProcessingResponse + var code envoyTypePb.StatusCode switch errutil.CanonicalCode(err) { // This code can be returned by scheduler when there is no capacity for sheddable // requests. case errutil.InferencePoolResourceExhausted: - resp = &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_TooManyRequests, - }, - }, - }, - } + code = envoyTypePb.StatusCode_TooManyRequests // This code can be returned by when EPP processes the request and run into server-side errors. case errutil.Internal: - resp = &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_InternalServerError, - }, - }, - }, - } + code = envoyTypePb.StatusCode_InternalServerError // This code can be returned by the director when there are no candidate pods for the request scheduling. case errutil.ServiceUnavailable: - resp = &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_ServiceUnavailable, - }, - }, - }, - } + code = envoyTypePb.StatusCode_ServiceUnavailable // This code can be returned when users provide invalid json request. case errutil.BadRequest: - resp = &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_BadRequest, - }, - }, - }, - } + code = envoyTypePb.StatusCode_BadRequest case errutil.BadConfiguration: - resp = &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_NotFound, - }, - }, - }, - } + code = envoyTypePb.StatusCode_NotFound default: return nil, status.Errorf(status.Code(err), "failed to handle request: %v", err) } + resp := &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: code, + }, + }, + }, + } + if err.Error() != "" { resp.Response.(*extProcPb.ProcessingResponse_ImmediateResponse).ImmediateResponse.Body = []byte(err.Error()) } From 9418a41a41573af4a60ff965b3eedaab792a13d2 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 10 Nov 2025 09:44:45 -0500 Subject: [PATCH 11/13] remove inferenceOjective from cache Signed-off-by: Lionel Villard --- pkg/activator/server/controller_manager.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/activator/server/controller_manager.go b/pkg/activator/server/controller_manager.go index 713ca8f7..e40eb22d 100644 --- a/pkg/activator/server/controller_manager.go +++ b/pkg/activator/server/controller_manager.go @@ -56,11 +56,6 @@ func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver. gknn.Namespace: {}, }, }, - &v1alpha2.InferenceObjective{}: { - Namespaces: map[string]cache.Config{ - gknn.Namespace: {}, - }, - }, }, }, Metrics: metricsServerOptions, From 54143fd4d3b043df34e18ea4f566e82b5715d18b Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 10 Nov 2025 09:47:34 -0500 Subject: [PATCH 12/13] revert to use go 1.24.1 Signed-off-by: Lionel Villard --- Dockerfile.activator | 4 ++-- Dockerfile.epp | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/Dockerfile.activator b/Dockerfile.activator index 4bd6496a..e7a6cc59 100644 --- a/Dockerfile.activator +++ b/Dockerfile.activator @@ -1,5 +1,5 @@ -# Build Stage: using Go 1.25 image -FROM quay.io/projectquay/golang:1.25 AS builder +# Build Stage: using Go 1.24.1 mage +FROM quay.io/projectquay/golang:1.24.1 AS builder ARG TARGETOS ARG TARGETARCH diff --git a/Dockerfile.epp b/Dockerfile.epp index ef40fabf..42718e28 100644 --- a/Dockerfile.epp +++ b/Dockerfile.epp @@ -1,7 +1,6 @@ ## Minimal runtime Dockerfile (microdnf-only, no torch, wrapper in site-packages) -# Build Stage: using Go 1.25 image -FROM quay.io/projectquay/golang:1.25 AS builder - +# Build Stage: using Go 1.24.1 image +FROM quay.io/projectquay/golang:1.24.1 AS builder ARG TARGETOS ARG TARGETARCH ARG KVCACHE_MANAGER_VERSION=v0.3.2 From 47bbf69b115998959d8adb9e63af959c67a4137f Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 10 Nov 2025 09:57:13 -0500 Subject: [PATCH 13/13] use golang 1.24 as 1.24.1 does not exists Signed-off-by: Lionel Villard --- Dockerfile.activator | 4 ++-- Dockerfile.epp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Dockerfile.activator b/Dockerfile.activator index e7a6cc59..85f98de5 100644 --- a/Dockerfile.activator +++ b/Dockerfile.activator @@ -1,5 +1,5 @@ -# Build Stage: using Go 1.24.1 mage -FROM quay.io/projectquay/golang:1.24.1 AS builder +# Build Stage: using Go 1.24 image +FROM quay.io/projectquay/golang:1.24 AS builder ARG TARGETOS ARG TARGETARCH diff --git a/Dockerfile.epp b/Dockerfile.epp index 42718e28..e3ab3252 100644 --- a/Dockerfile.epp +++ b/Dockerfile.epp @@ -1,6 +1,6 @@ ## Minimal runtime Dockerfile (microdnf-only, no torch, wrapper in site-packages) -# Build Stage: using Go 1.24.1 image -FROM quay.io/projectquay/golang:1.24.1 AS builder +# Build Stage: using Go 1.24 image +FROM quay.io/projectquay/golang:1.24 AS builder ARG TARGETOS ARG TARGETARCH ARG KVCACHE_MANAGER_VERSION=v0.3.2