diff --git a/Dockerfile.activator b/Dockerfile.activator new file mode 100644 index 00000000..85f98de5 --- /dev/null +++ b/Dockerfile.activator @@ -0,0 +1,42 @@ +# Build Stage: using Go 1.24 image +FROM quay.io/projectquay/golang:1.24 AS builder +ARG TARGETOS +ARG TARGETARCH + +WORKDIR /workspace + +# Copy the Go Modules manifests +COPY go.mod go.mod +COPY go.sum go.sum + +# Copy the go source +COPY cmd/activator ./cmd/activator +COPY pkg/activator ./pkg/activator + +# Build +# the GOARCH has not a default value to allow the binary be built according to the host where the command +# was called. For example, if we call make image-build in a local env which has the Apple Silicon M1 SO +# the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore, +# by leaving it empty we can ensure that the container and binary shipped on it will have the same platform. +ENV CGO_ENABLED=0 +ENV GOOS=${TARGETOS:-linux} +ENV GOARCH=${TARGETARCH} +ARG COMMIT_SHA=unknown +ARG BUILD_REF +RUN go build -a -o bin/activator -ldflags="-X sigs.k8s.io/gateway-api-inference-extension/version.CommitSHA=${COMMIT_SHA} -X sigs.k8s.io/gateway-api-inference-extension/version.BuildRef=${BUILD_REF}" cmd/activator/main.go + + +# Use ubi9 as a minimal base image to package the manager binary +# Refer to https://catalog.redhat.com/software/containers/ubi9/ubi-minimal/615bd9b4075b022acc111bf5 for more details +FROM registry.access.redhat.com/ubi9/ubi-minimal:latest +WORKDIR / +COPY --from=builder /workspace/bin/activator /app/activator + +# expose gRPC, health and metrics ports +EXPOSE 9002 +EXPOSE 9003 +EXPOSE 9090 + +USER 65532:65532 + +ENTRYPOINT ["/app/activator"] diff --git a/Dockerfile.epp b/Dockerfile.epp index 38dfeca4..e3ab3252 100644 --- a/Dockerfile.epp +++ b/Dockerfile.epp @@ -1,7 +1,6 @@ ## Minimal runtime Dockerfile (microdnf-only, no torch, wrapper in site-packages) -# Build Stage: using Go 1.25 image -FROM quay.io/projectquay/golang:1.25 AS builder - +# Build Stage: using Go 1.24 image +FROM quay.io/projectquay/golang:1.24 AS builder ARG TARGETOS ARG TARGETARCH ARG KVCACHE_MANAGER_VERSION=v0.3.2 @@ -105,4 +104,3 @@ EXPOSE 9090 EXPOSE 5557 ENTRYPOINT ["/app/epp"] - diff --git a/Makefile b/Makefile index 2ec6df3c..e7a5f475 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,12 @@ NAMESPACE ?= hc4ai-operator VLLM_SIMULATOR_TAG ?= v0.5.0 export VLLM_SIMULATOR_TAG +ACTIVATOR_IMAGE_NAME ?= llm-d-activator +ACTIVATOR_NAME ?= activator +ACTIVATOR_TAG ?= dev +ACTIVATOR_IMAGE_TAG_BASE ?= $(IMAGE_REGISTRY)/$(ACTIVATOR_IMAGE_NAME) +ACTIVATOR_IMG = $(ACTIVATOR_IMAGE_TAG_BASE):$(ACTIVATOR_TAG) + # Map go arch to typos arch ifeq ($(TARGETARCH),amd64) TYPOS_TARGET_ARCH = x86_64 @@ -59,10 +65,13 @@ SRC = $(shell find . -type f -name '*.go') # Internal variables for generic targets epp_IMAGE = $(IMG) sidecar_IMAGE = $(SIDECAR_IMG) +activator_IMAGE = $(ACTIVATOR_IMG) epp_NAME = epp sidecar_NAME = $(SIDECAR_NAME) +activator_NAME = $(ACTIVATOR_NAME) epp_LDFLAGS = -ldflags="$(LDFLAGS)" sidecar_LDFLAGS = +activator_LDFLAGS = -ldflags="$(LDFLAGS)" epp_TEST_FILES = go list ./... | grep -v /test/ | grep -v ./pkg/sidecar/ sidecar_TEST_FILES = go list ./pkg/sidecar/... @@ -135,7 +144,7 @@ lint: check-golangci-lint check-typos ## Run lint ##@ Build .PHONY: build -build: build-epp build-sidecar ## Build the project +build: build-epp build-sidecar build-activator ## Build the project .PHONY: build-% build-%: check-go install-dependencies download-tokenizer ## Build the project @@ -145,7 +154,7 @@ build-%: check-go install-dependencies download-tokenizer ## Build the project ##@ Container Build/Push .PHONY: image-build -image-build: image-build-epp image-build-sidecar ## Build Docker image +image-build: image-build-epp image-build-sidecar image-build-activator ## Build Docker image .PHONY: image-build-% image-build-%: check-container-tool ## Build Docker image ## Build Docker image using $(CONTAINER_RUNTIME) @@ -159,7 +168,7 @@ image-build-%: check-container-tool ## Build Docker image ## Build Docker image -t $($*_IMAGE) -f Dockerfile.$* . .PHONY: image-push -image-push: image-push-epp image-push-sidecar ## Push container images to registry +image-push: image-push-epp image-push-sidecar image-push-activator ## Push container images to registry .PHONY: image-push-% image-push-%: check-container-tool ## Push container image to registry @@ -287,7 +296,7 @@ check-typos: $(TYPOS) ## Check for spelling errors using typos (exits with error echo "$$TYPOS_OUTPUT"; \ exit 1; \ fi - + ##@ Tools .PHONY: check-tools @@ -336,7 +345,7 @@ check-container-tool: else \ echo "✅ Container tool '$(CONTAINER_RUNTIME)' found."; \ fi - + .PHONY: check-kubectl check-kubectl: diff --git a/charts/activator-filter/.helmignore b/charts/activator-filter/.helmignore new file mode 100644 index 00000000..0e8a0eb3 --- /dev/null +++ b/charts/activator-filter/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/charts/activator-filter/Chart.yaml b/charts/activator-filter/Chart.yaml new file mode 100644 index 00000000..1e3c0b74 --- /dev/null +++ b/charts/activator-filter/Chart.yaml @@ -0,0 +1,9 @@ +apiVersion: v2 +name: activator +description: A Helm chart for the activator extension + +type: application + +version: 0.1.0 + +appVersion: "0.2.0" diff --git a/charts/activator-filter/README.md b/charts/activator-filter/README.md new file mode 100644 index 00000000..345207d2 --- /dev/null +++ b/charts/activator-filter/README.md @@ -0,0 +1,33 @@ +# Activator Chart + +A chart to deploy the activator HTTP filter for an InferenceGateway and RBAC for all per route activator deployments. + +## Install + +To install an activator-filter named `activator-filter`, you can run the following command: + +```txt +$ helm install activator-filter ./charts/activator-filter +``` + +> **Note:** This chart should be deployed before the [Body Based Routing](https://github.com/kubernetes-sigs/gateway-api-inference-extension/tree/main/config/charts/body-based-routing) chart for optimal functionality. + +## Uninstall + +Run the following command to uninstall the chart: + +```txt +$ helm uninstall activator-filter +``` + +## Configuration + +The following table list the configurable parameters of the chart. + +| **Parameter Name** | **Description** | +|---------------------------------------------|----------------------------------------------------------------------------------------------------| +| `name` | Name of the activator RBAC resources. Defaults to `activator`. | + +## Notes + +This chart should only be deployed once diff --git a/charts/activator-filter/templates/NOTES.txt b/charts/activator-filter/templates/NOTES.txt new file mode 100644 index 00000000..c0d990f6 --- /dev/null +++ b/charts/activator-filter/templates/NOTES.txt @@ -0,0 +1 @@ +Activator HTTP Filter deployed. diff --git a/charts/activator-filter/templates/istio.yaml b/charts/activator-filter/templates/istio.yaml new file mode 100644 index 00000000..2d9c64a6 --- /dev/null +++ b/charts/activator-filter/templates/istio.yaml @@ -0,0 +1,25 @@ +apiVersion: networking.istio.io/v1alpha3 +kind: EnvoyFilter +metadata: + name: {{ .Values.name }}-ext-proc + namespace: {{ .Release.Namespace }} +spec: + configPatches: + - applyTo: HTTP_FILTER + match: + # context omitted so that this applies to both sidecars and gateways + listener: + filterChain: + filter: + name: "envoy.filters.network.http_connection_manager" + patch: + operation: INSERT_FIRST + value: + name: envoy.filters.http.activator.ext_proc + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor + failure_mode_allow: true + grpc_service: + envoy_grpc: + cluster_name: no-op + message_timeout: 120s diff --git a/charts/activator-filter/templates/rbac.yaml b/charts/activator-filter/templates/rbac.yaml new file mode 100644 index 00000000..13c21941 --- /dev/null +++ b/charts/activator-filter/templates/rbac.yaml @@ -0,0 +1,82 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Values.name }} + namespace: {{ .Release.Namespace }} +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: {{ .Values.name }} + namespace: {{ .Release.Namespace }} +rules: # TODO: These can probably be trimmed down +- apiGroups: + - "inference.networking.x-k8s.io" + - "inference.networking.k8s.io" + resources: + - "inferencepools" + verbs: + - "get" + - "watch" + - "list" +- apiGroups: + - "" + resources: + - "pods" + verbs: + - "get" + - "watch" + - "list" +- apiGroups: + - "discovery.k8s.io" + resources: + - "endpointslices" + verbs: + - "get" + - "watch" + - "list" +- apiGroups: + - "authentication.k8s.io" + resources: + - "tokenreviews" + verbs: + - "create" +- apiGroups: + - "authorization.k8s.io" + resources: + - "subjectaccessreviews" + verbs: + - "create" +- apiGroups: + - "apps" + resources: + - "deployments" + verbs: + - "create" + - "get" + - "list" + - "watch" + - "update" + - "patch" + - "delete" +- apiGroups: + - apps + resources: + - deployments/scale + verbs: + - get + - update +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: {{ .Values.name }} + namespace: {{ .Release.Namespace }} +subjects: +- kind: ServiceAccount + name: {{ .Values.name }} + namespace: {{ .Release.Namespace }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: {{ .Values.name }} diff --git a/charts/activator-filter/values.yaml b/charts/activator-filter/values.yaml new file mode 100644 index 00000000..9781aee0 --- /dev/null +++ b/charts/activator-filter/values.yaml @@ -0,0 +1 @@ +name: activator diff --git a/charts/activator/.helmignore b/charts/activator/.helmignore new file mode 100644 index 00000000..0e8a0eb3 --- /dev/null +++ b/charts/activator/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/charts/activator/Chart.yaml b/charts/activator/Chart.yaml new file mode 100644 index 00000000..27ac6a2a --- /dev/null +++ b/charts/activator/Chart.yaml @@ -0,0 +1,9 @@ +apiVersion: v2 +name: activator-route +description: A Helm chart for the activator extension + +type: application + +version: 0.1.0 + +appVersion: "0.2.0" diff --git a/charts/activator/README.md b/charts/activator/README.md new file mode 100644 index 00000000..cb9febfd --- /dev/null +++ b/charts/activator/README.md @@ -0,0 +1,44 @@ +# Activator Chart + +A chart to deploy the activator deployment and service per HTTPRoute. + +## Install + +To install an activator named `-activator`, you can run the following command: + +```txt +$ helm install activator ./charts/activator \ + --set route.name=http-route-name \ + --set inferencePool.name=inference-pool-name + +``` + +## Uninstall + +Run the following command to uninstall the chart: + +```txt +$ helm uninstall activator +``` + +## Configuration + +The following table list the configurable parameters of the chart. + +| **Parameter Name** | **Description** | +|---------------------------------------------|----------------------------------------------------------------------------------------------------| +| `activator.suffix` | Suffix to append to the name of the activator deployment and service. Defaults to `-activator`. | +| `activator.port` | Port serving ext_proc. Defaults to `9004`. | +| `activator.healthCheckPort` | Port for health checks. Defaults to `9005`. | +| `activator.image.name` | Name of the container image used. | +| `activator.image.registry` | Registry URL and namespace where the image is hosted. | +| `activator.image.tag` | Image tag. | +| `activator.image.pullPolicy` | Image pull policy for the container. Possible values: `Always`, `IfNotPresent`, or `Never`. Defaults to `Always`. | +| `inferenceGateway.port` | The port of the Gateway. Defaults to `80`. | +| `inferencePool.name` | The name of the InferencePool to target. | +| `inferencePool.apiVersion` | The API version of the InferencePool. Defaults to `inference.networking.k8s.io`. | +| `route.name` | The name of the HTTPRoute to attach the activator to. | + +## Notes + +This chart should only be deployed once per HTTPRoute. diff --git a/charts/activator/templates/NOTES.txt b/charts/activator/templates/NOTES.txt new file mode 100644 index 00000000..f0fa7a37 --- /dev/null +++ b/charts/activator/templates/NOTES.txt @@ -0,0 +1 @@ +Activator extension deployed for HTTPRoute {{.Values.route.name }} diff --git a/charts/activator/templates/activator.yaml b/charts/activator/templates/activator.yaml new file mode 100644 index 00000000..369a54e9 --- /dev/null +++ b/charts/activator/templates/activator.yaml @@ -0,0 +1,54 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "activatorName" . }} + namespace: {{ .Release.Namespace }} +spec: + replicas: 1 + selector: + matchLabels: + app: {{ include "activatorName" . }} + template: + metadata: + labels: + app: {{ include "activatorName" . }} + spec: + containers: + - name: activator + image: {{ .Values.activator.image.registry }}/{{ .Values.activator.image.name }}:{{ .Values.activator.image.tag }} + imagePullPolicy: {{ .Values.activator.image.pullPolicy | default "Always" }} + args: + - "--pool-name" + - "{{ .Values.inferencePool.name }}" + - "--pool-namespace" + - "{{ .Release.Namespace }}" + - "--pool-group" + - "{{ .Values.inferencePool.group }}" + - "--grpc-port" + - "{{ .Values.activator.port }}" + - "--grpc-health-port" + - "{{ .Values.activator.healthCheckPort }}" + - "--zap-encoder" + - "json" + - "--v" + - "2" + ports: + - containerPort: {{ .Values.activator.port }} + # health check + - containerPort: {{ .Values.activator.healthCheckPort }} + serviceAccountName: activator +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ include "activatorName" . }} + namespace: {{ .Release.Namespace }} +spec: + selector: + app: {{ include "activatorName" . }} + ports: + - protocol: TCP + port: {{ .Values.activator.port }} + targetPort: {{ .Values.activator.port }} + appProtocol: HTTP2 + type: ClusterIP diff --git a/charts/activator/templates/helpers.tpl b/charts/activator/templates/helpers.tpl new file mode 100644 index 00000000..9ad85bd2 --- /dev/null +++ b/charts/activator/templates/helpers.tpl @@ -0,0 +1,3 @@ +{{- define "activatorName" -}} +{{ .Values.route.name }}{{ .Values.activator.suffix }} +{{- end -}} diff --git a/charts/activator/templates/istio.yaml b/charts/activator/templates/istio.yaml new file mode 100644 index 00000000..4723b259 --- /dev/null +++ b/charts/activator/templates/istio.yaml @@ -0,0 +1,44 @@ +apiVersion: networking.istio.io/v1alpha3 +kind: EnvoyFilter +metadata: + name: {{ include "activatorName" . }} + namespace: {{ .Release.Namespace }} +spec: + configPatches: + - applyTo: HTTP_ROUTE + match: + routeConfiguration: + vhost: + name: "*:{{ .Values.inferenceGateway.port }}" + route: + name: {{ .Release.Namespace }}.{{ .Values.route.name }}.0 # TODO: what .0? + patch: + operation: MERGE + value: + typed_per_filter_config: + envoy.filters.http.activator.ext_proc: + "@type": type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcPerRoute + overrides: + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SKIP" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + grpc_service: + envoy_grpc: + cluster_name: outbound|{{ .Values.activator.port }}||{{ include "activatorName" . }}.{{ .Release.Namespace }}.svc.cluster.local + +--- +apiVersion: networking.istio.io/v1 +kind: DestinationRule +metadata: + name: {{ include "activatorName" . }} + namespace: {{ .Release.Namespace }} +spec: + host: {{ include "activatorName" . }}.{{ .Release.Namespace }}.svc.cluster.local + trafficPolicy: + tls: + mode: SIMPLE + insecureSkipVerify: true diff --git a/charts/activator/values.yaml b/charts/activator/values.yaml new file mode 100644 index 00000000..954cbc48 --- /dev/null +++ b/charts/activator/values.yaml @@ -0,0 +1,19 @@ +activator: + suffix: -activator + image: + name: activator + registry: ghcr.io/llm-d/llm-d-activator # hostname and namespace of the image registry + tag: main + pullPolicy: Always + port: 9004 + healthCheckPort: 9005 + +route: + name: http-route + +inferencePool: + name: inference-pool + group: inference.networking.k8s.io + +inferenceGateway: + port: 80 diff --git a/cmd/activator/main.go b/cmd/activator/main.go new file mode 100644 index 00000000..3e7e38f0 --- /dev/null +++ b/cmd/activator/main.go @@ -0,0 +1,35 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package main implements the main entry point for the activator component. +package main + +import ( + "os" + + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/llm-d/llm-d-inference-scheduler/cmd/activator/runner" +) + +func main() { + // For adding out-of-tree plugins to the plugins registry, use the following: + // plugins.Register(my-out-of-tree-plugin-name, my-out-of-tree-plugin-factory-function) + + if err := runner.Run(ctrl.SetupSignalHandler()); err != nil { + os.Exit(1) + } +} diff --git a/cmd/activator/runner/health.go b/cmd/activator/runner/health.go new file mode 100644 index 00000000..e5683702 --- /dev/null +++ b/cmd/activator/runner/health.go @@ -0,0 +1,123 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package runner provides the main runner for the activator component. +package runner + +import ( + "context" + "fmt" + "sync/atomic" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/go-logr/logr" + "google.golang.org/grpc/codes" + healthPb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +type healthServer struct { + logger logr.Logger + datastore datastore.Datastore + isLeader *atomic.Bool + leaderElectionEnabled bool +} + +const ( + livenessCheckService = "liveness" + readinessCheckService = "readiness" +) + +func (s *healthServer) Check(_ context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) { + isLive := s.datastore.PoolHasSynced() + + // If leader election is disabled, use current logic: all checks are based on whether the pool has synced. + if !s.leaderElectionEnabled { + if !isLive { + s.logger.V(logutil.DEFAULT).Info("gRPC health check not serving (leader election disabled)", "service", in.Service) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil + } + s.logger.V(logutil.TRACE).Info("gRPC health check serving (leader election disabled)", "service", in.Service) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil + } + + // When leader election is enabled, differentiate between liveness and readiness. + // The service name in the request determines which check to perform. + var checkName string + var isPassing bool + + switch in.Service { + case readinessCheckService: + checkName = "readiness" + isPassing = isLive && s.isLeader.Load() + case "": // Handle overall server health for load balancers that use an empty service name. + checkName = "empty service name (considered as overall health)" + // The overall health for a load balancer should reflect readiness to accept traffic, + // which is true only for the leader pod that has synced its data. + isPassing = isLive && s.isLeader.Load() + case livenessCheckService: + checkName = "liveness" + // Any pod that is running and can respond to this gRPC check is considered "live". + // The datastore sync status should not affect liveness, only readiness. + // This is to prevent the non-leader node from continuous restarts + isPassing = true + case extProcPb.ExternalProcessor_ServiceDesc.ServiceName: + // The main service is considered ready only on the leader. + checkName = "ext_proc" + isPassing = isLive && s.isLeader.Load() + default: + s.logger.V(logutil.DEFAULT).Info("gRPC health check requested unknown service", "available-services", []string{livenessCheckService, readinessCheckService, extProcPb.ExternalProcessor_ServiceDesc.ServiceName}, "requested-service", in.Service) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVICE_UNKNOWN}, nil + } + + if !isPassing { + s.logger.V(logutil.DEFAULT).Info(fmt.Sprintf("gRPC %s check not serving", checkName), "service", in.Service, "isLive", isLive, "isLeader", s.isLeader.Load()) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil + } + + s.logger.V(logutil.TRACE).Info(fmt.Sprintf("gRPC %s check serving", checkName), "service", in.Service) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil +} + +func (s *healthServer) List(ctx context.Context, _ *healthPb.HealthListRequest) (*healthPb.HealthListResponse, error) { + statuses := make(map[string]*healthPb.HealthCheckResponse) + + services := []string{extProcPb.ExternalProcessor_ServiceDesc.ServiceName} + if s.leaderElectionEnabled { + services = append(services, livenessCheckService, readinessCheckService) + } + + for _, service := range services { + resp, err := s.Check(ctx, &healthPb.HealthCheckRequest{Service: service}) + if err != nil { + // Check can return an error for unknown services, but here we are iterating known services. + // If another error occurs, we should probably return it. + return nil, err + } + statuses[service] = resp + } + + return &healthPb.HealthListResponse{ + Statuses: statuses, + }, nil +} + +func (s *healthServer) Watch(_ *healthPb.HealthCheckRequest, _ healthPb.Health_WatchServer) error { + return status.Error(codes.Unimplemented, "Watch is not implemented") +} diff --git a/cmd/activator/runner/runner.go b/cmd/activator/runner/runner.go new file mode 100644 index 00000000..a336830c --- /dev/null +++ b/cmd/activator/runner/runner.go @@ -0,0 +1,262 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runner + +import ( + "context" + "flag" + "fmt" + "os" + "sync/atomic" + + "github.com/go-logr/logr" + uberzap "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + healthPb "google.golang.org/grpc/health/grpc_health_v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/metrics/filters" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" + "sigs.k8s.io/gateway-api-inference-extension/version" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/requestcontrol" + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/runnable" + runserver "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/server" +) + +var ( + grpcPort = flag.Int("grpc-port", runserver.DefaultGrpcPort, "The gRPC port used for communicating with Envoy proxy") + grpcHealthPort = flag.Int("grpc-health-port", runserver.DefaultGrpcHealthPort, "The port used for gRPC liveness and readiness probes") + metricsPort = flag.Int("metrics-port", runserver.DefaultMetricsPort, "The metrics port") + poolName = flag.String("pool-name", runserver.DefaultPoolName, "Name of the InferencePool this Endpoint Picker is associated with.") + poolGroup = flag.String("pool-group", runserver.DefaultPoolGroup, "group of the InferencePool this Endpoint Picker is associated with.") + poolNamespace = flag.String("pool-namespace", "", "Namespace of the InferencePool this Endpoint Picker is associated with.") + enableScaleToZero = flag.Bool("enable-scale-to-zero", false, "Enable scaling down InferencePool to zero replicas after a period of idleness") + logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity") + secureServing = flag.Bool("secure-serving", runserver.DefaultSecureServing, "Enables secure serving. Defaults to true.") + healthChecking = flag.Bool("health-checking", runserver.DefaultHealthChecking, "Enables health checking") + certPath = flag.String("cert-path", runserver.DefaultCertPath, "The path to the certificate for secure serving. The certificate and private key files "+ + "are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+ + "then a self-signed certificate is used.") + haEnableLeaderElection = flag.Bool("ha-enable-leader-election", false, "Enables leader election for high availability. When enabled, readiness probes will only pass on the leader.") + + setupLog = ctrl.Log.WithName("setup") +) + +// Run starts the activator runner with the provided context, initializes logging, +// controllers, and servers, and blocks until the manager exits or the context is cancelled. +func Run(ctx context.Context) error { + opts := zap.Options{ + Development: true, + } + opts.BindFlags(flag.CommandLine) + flag.Parse() + initLogging(&opts) + + setupLog.Info("Activator build", "commit-sha", version.CommitSHA, "build-ref", version.BuildRef) + + // Validate flags + if err := validateFlags(); err != nil { + setupLog.Error(err, "Failed to validate flags") + return err + } + + // Print all flag values + flags := make(map[string]any) + flag.VisitAll(func(f *flag.Flag) { + flags[f.Name] = f.Value + }) + setupLog.Info("Flags processed", "flags", flags) + + // --- Get Kubernetes Config --- + cfg, err := ctrl.GetConfig() + if err != nil { + setupLog.Error(err, "Failed to get Kubernetes rest config") + return err + } + + // --- Setup Datastore --- + datastore := datastore.NewDatastore() + + // --- Setup Activator --- + activator, err := requestcontrol.NewActivatorWithConfig(cfg, datastore) + if err != nil { + setupLog.Error(err, "Failed to setup Activator") + return err + } + + // --- Setup Deactivator --- + if *enableScaleToZero { + deactivator, err := requestcontrol.DeactivatorWithConfig(cfg, datastore) + if err != nil { + setupLog.Error(err, "Failed to setup Deactivator") + return err + } + + // Start Deactivator + go deactivator.MonitorInferencePoolIdleness(ctx) + } + + // --- Setup Metrics Server --- + metricsServerOptions := metricsserver.Options{ + BindAddress: fmt.Sprintf(":%d", *metricsPort), + FilterProvider: filters.WithAuthenticationAndAuthorization, + } + + // Determine pool namespace: if --pool-namespace is non-empty, use it; else NAMESPACE env var; else default + resolvePoolNamespace := func() string { + if *poolNamespace != "" { + return *poolNamespace + } + if nsEnv := os.Getenv("NAMESPACE"); nsEnv != "" { + return nsEnv + } + return runserver.DefaultPoolNamespace + } + resolvedPoolNamespace := resolvePoolNamespace() + poolNamespacedName := types.NamespacedName{ + Name: *poolName, + Namespace: resolvedPoolNamespace, + } + poolGroupKind := schema.GroupKind{ + Group: *poolGroup, + Kind: "InferencePool", + } + poolGKNN := common.GKNN{ + NamespacedName: poolNamespacedName, + GroupKind: poolGroupKind, + } + + isLeader := &atomic.Bool{} + isLeader.Store(false) + + mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions, *haEnableLeaderElection) + if err != nil { + setupLog.Error(err, "Failed to create controller manager") + return err + } + + if *haEnableLeaderElection { + setupLog.Info("Leader election enabled") + go func() { + <-mgr.Elected() + isLeader.Store(true) + setupLog.Info("This instance is now the leader!") + }() + } else { + // If leader election is disabled, all instances are "leaders" for readiness purposes. + isLeader.Store(true) + } + + // --- Setup ExtProc Server Runner --- + serverRunner := &runserver.ExtProcServerRunner{ + GrpcPort: *grpcPort, + PoolNamespacedName: poolNamespacedName, + PoolGKNN: poolGKNN, + Datastore: datastore, + SecureServing: *secureServing, + HealthChecking: *healthChecking, + CertPath: *certPath, + Activator: activator, + } + if err := serverRunner.SetupWithManager(ctx, mgr); err != nil { + setupLog.Error(err, "Failed to setup Activator controllers") + return err + } + + // --- Add Runnables to Manager --- + // Register health server. + if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort, isLeader, *haEnableLeaderElection); err != nil { + return err + } + + // Register ext-proc server. + if err := registerExtProcServer(mgr, serverRunner, ctrl.Log.WithName("ext-proc")); err != nil { + return err + } + + // --- Start Manager --- + // This blocks until a signal is received. + setupLog.Info("Controller manager starting") + if err := mgr.Start(ctx); err != nil { + setupLog.Error(err, "Error starting controller manager") + return err + } + setupLog.Info("Controller manager terminated") + return nil +} + +func initLogging(opts *zap.Options) { + // Unless -zap-log-level is explicitly set, use -v + useV := true + flag.Visit(func(f *flag.Flag) { + if f.Name == "zap-log-level" { + useV = false + } + }) + if useV { + // See https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/log/zap#Options.Level + lvl := -1 * (*logVerbosity) + opts.Level = uberzap.NewAtomicLevelAt(zapcore.Level(int8(lvl))) + } + + logger := zap.New(zap.UseFlagOptions(opts), zap.RawZapOpts(uberzap.AddCaller())) + ctrl.SetLogger(logger) +} + +// registerExtProcServer adds the ExtProcServerRunner as a Runnable to the manager. +func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerRunner, logger logr.Logger) error { + if err := mgr.Add(runner.AsRunnable(logger)); err != nil { + setupLog.Error(err, "Failed to register ext-proc gRPC server runnable") + return err + } + setupLog.Info("ExtProc server runner added to manager.") + return nil +} + +// registerHealthServer adds the Health gRPC server as a Runnable to the given manager. +func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int, isLeader *atomic.Bool, leaderElectionEnabled bool) error { + srv := grpc.NewServer() + healthPb.RegisterHealthServer(srv, &healthServer{ + logger: logger, + datastore: ds, + isLeader: isLeader, + leaderElectionEnabled: leaderElectionEnabled, + }) + if err := mgr.Add( + runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil { + setupLog.Error(err, "Failed to register health server") + return err + } + return nil +} + +func validateFlags() error { + if *poolName == "" { + return fmt.Errorf("required %q flag not set", "poolName") + } + + return nil +} diff --git a/pkg/activator/controller/inferencepool_reconciler.go b/pkg/activator/controller/inferencepool_reconciler.go new file mode 100644 index 00000000..23751819 --- /dev/null +++ b/pkg/activator/controller/inferencepool_reconciler.go @@ -0,0 +1,120 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package controller contains reconcilers for InferencePool resources used by the activator +// component; it implements controller-runtime Reconciler logic to keep the Datastore synchronized +// with Kubernetes InferencePool objects across supported API groups. +package controller + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/api/errors" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +// InferencePoolReconciler utilizes the controller runtime to reconcile Instance Gateway resources +// This implementation is just used for reading & maintaining data sync. The Gateway implementation +// will have the proper controller that will create/manage objects on behalf of the server pool. +type InferencePoolReconciler struct { + client.Reader + Datastore datastore.Datastore + PoolGKNN common.GKNN +} + +// Reconcile reconciles an InferencePool and keeps the Datastore synchronized with the +// current state of the corresponding Kubernetes InferencePool object. +func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx).WithValues("group", c.PoolGKNN.Group).V(logutil.DEFAULT) + ctx = ctrl.LoggerInto(ctx, logger) + + logger.Info("Reconciling InferencePool") + + // 1. Initialize a generic client.Object based on the group. + var obj client.Object + switch c.PoolGKNN.Group { + case v1.GroupName: + obj = &v1.InferencePool{} + case v1alpha2.GroupName: + obj = &v1alpha2.InferencePool{} + default: + // Handle unsupported groups gracefully. + return ctrl.Result{}, fmt.Errorf("cannot reconcile InferencePool - unsupported API group: %s", c.PoolGKNN.Group) + } + + // 2. Perform a single, generic fetch for the object. + if err := c.Get(ctx, req.NamespacedName, obj); err != nil { + if errors.IsNotFound(err) { + logger.Info("InferencePool not found. Clearing the datastore") + c.Datastore.Clear() + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("unable to get InferencePool - %w", err) + } + + // 3. Perform common checks using the client.Object interface. + if !obj.GetDeletionTimestamp().IsZero() { + logger.Info("InferencePool is marked for deletion. Clearing the datastore") + c.Datastore.Clear() + return ctrl.Result{}, nil + } + + // 4. Convert the fetched object to the canonical v1.InferencePool. + v1infPool := &v1.InferencePool{} + + switch pool := obj.(type) { + case *v1.InferencePool: + // If it's already a v1 object, just use it. + v1infPool = pool + case *v1alpha2.InferencePool: + var err error + err = pool.ConvertTo(v1infPool) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to convert XInferencePool to InferencePool - %w", err) + } + default: + return ctrl.Result{}, fmt.Errorf("unsupported API group: %s", c.PoolGKNN.Group) + } + + c.Datastore.PoolSet(v1infPool) + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the manager for the configured API group. +func (c *InferencePoolReconciler) SetupWithManager(mgr ctrl.Manager) error { + switch c.PoolGKNN.Group { + case v1alpha2.GroupName: + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha2.InferencePool{}). + Complete(c) + case v1.GroupName: + return ctrl.NewControllerManagedBy(mgr). + For(&v1.InferencePool{}). + Complete(c) + default: + return fmt.Errorf("unknown group %s", c.PoolGKNN.Group) + } +} diff --git a/pkg/activator/datastore/datastore.go b/pkg/activator/datastore/datastore.go new file mode 100644 index 00000000..4cc4a6ad --- /dev/null +++ b/pkg/activator/datastore/datastore.go @@ -0,0 +1,110 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package datastore provides a local in-memory cache for the activator component, +// storing the current InferencePool and managing a ticker used by background +// goroutines; it exposes thread-safe operations to get/set the pool and control +// the ticker. +package datastore + +import ( + "errors" + "sync" + "time" + + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" +) + +var ( + errPoolNotSynced = errors.New("InferencePool is not initialized in data store") +) + +// Datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api) +type Datastore interface { + // InferencePool operations + // PoolSet sets the given pool in datastore. + PoolSet(pool *v1.InferencePool) + PoolGet() (*v1.InferencePool, error) + PoolHasSynced() bool + + GetTickerCh() <-chan time.Time + ResetTicker(t time.Duration) + StopTicker() + + // Clears the store state, happens when the pool gets deleted. + Clear() +} + +// NewDatastore creates a new Datastore instance with the provided parent context. +func NewDatastore() Datastore { + store := &datastore{ + poolMu: sync.RWMutex{}, + ticker: time.NewTicker(60 * time.Second), + } + return store +} + +type datastore struct { + // poolMu is used to synchronize access to pool map. + poolMu sync.RWMutex + pool *v1.InferencePool + ticker *time.Ticker +} + +// /// InferencePool APIs /// +func (ds *datastore) PoolSet(pool *v1.InferencePool) { + ds.poolMu.Lock() + defer ds.poolMu.Unlock() + + ds.pool = pool +} + +func (ds *datastore) PoolGet() (*v1.InferencePool, error) { + ds.poolMu.RLock() + defer ds.poolMu.RUnlock() + if ds.pool == nil { + return nil, errPoolNotSynced + } + return ds.pool, nil +} + +func (ds *datastore) PoolHasSynced() bool { + ds.poolMu.RLock() + defer ds.poolMu.RUnlock() + return ds.pool != nil +} + +func (ds *datastore) Clear() { + ds.PoolSet(nil) +} + +func (ds *datastore) ResetTicker(t time.Duration) { + ds.poolMu.Lock() + defer ds.poolMu.Unlock() + ds.ticker.Reset(t) +} + +func (ds *datastore) GetTickerCh() <-chan time.Time { + ds.poolMu.RLock() + defer ds.poolMu.RUnlock() + return ds.ticker.C +} + +func (ds *datastore) StopTicker() { + ds.poolMu.Lock() + defer ds.poolMu.Unlock() + ds.ticker.Stop() +} diff --git a/pkg/activator/datastore/datastore_test.go b/pkg/activator/datastore/datastore_test.go new file mode 100644 index 00000000..66e3cc9b --- /dev/null +++ b/pkg/activator/datastore/datastore_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datastore + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + testutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" +) + +func TestPool(t *testing.T) { + pool1Selector := map[string]string{"app": "vllm_v1"} + pool1 := testutil.MakeInferencePool("pool1"). + Namespace("default"). + Selector(pool1Selector).ObjRef() + tests := []struct { + name string + inferencePool *v1.InferencePool + wantSynced bool + wantPool *v1.InferencePool + wantErr error + }{ + { + name: "Ready when InferencePool exists in data store", + inferencePool: pool1, + wantSynced: true, + wantPool: pool1, + }, + { + name: "Labels not matched", + inferencePool: pool1, + wantSynced: true, + wantPool: pool1, + }, + { + name: "Not ready when InferencePool is nil in data store", + wantErr: errPoolNotSynced, + wantSynced: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Set up the scheme. + scheme := runtime.NewScheme() + _ = clientgoscheme.AddToScheme(scheme) + + datastore := NewDatastore() + datastore.PoolSet(tt.inferencePool) + gotPool, gotErr := datastore.PoolGet() + if diff := cmp.Diff(tt.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Unexpected error diff (+got/-want): %s", diff) + } + if diff := cmp.Diff(tt.wantPool, gotPool); diff != "" { + t.Errorf("Unexpected pool diff (+got/-want): %s", diff) + } + gotSynced := datastore.PoolHasSynced() + if diff := cmp.Diff(tt.wantSynced, gotSynced); diff != "" { + t.Errorf("Unexpected synced diff (+got/-want): %s", diff) + } + }) + } +} diff --git a/pkg/activator/handlers/server.go b/pkg/activator/handlers/server.go new file mode 100644 index 00000000..3cc80cd1 --- /dev/null +++ b/pkg/activator/handlers/server.go @@ -0,0 +1,172 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package handlers implements the gRPC server for Envoy's external processing API, +package handlers + +import ( + "context" + "io" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "sigs.k8s.io/controller-runtime/pkg/log" + + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +var continueHeadersResponse = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestHeaders{ + RequestHeaders: &extProcPb.HeadersResponse{ + Response: &extProcPb.CommonResponse{ + Status: extProcPb.CommonResponse_CONTINUE, + }, + }, + }, +} + +// NewStreamingServer creates a new StreamingServer with the provided Datastore and Activator. +func NewStreamingServer(datastore Datastore, activator Activator) *StreamingServer { + return &StreamingServer{ + activator: activator, + datastore: datastore, + } +} + +// Activator defines the interface for activating replicas based on incoming requests. +type Activator interface { + MayActivate(ctx context.Context) error +} + +// Datastore defines the interface for accessing InferencePool data. +type Datastore interface { + PoolGet() (*v1.InferencePool, error) +} + +// StreamingServer implements the Envoy external processing server. +// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto +type StreamingServer struct { + datastore Datastore + activator Activator +} + +// Process handles the bidirectional streaming RPC for external processing. +func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { + ctx := srv.Context() + logger := log.FromContext(ctx) + loggerTrace := logger.V(logutil.TRACE) + loggerTrace.Info("Processing") + + var err error + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + req, recvErr := srv.Recv() + if recvErr == io.EOF || status.Code(recvErr) == codes.Canceled { + return nil + } + if recvErr != nil { + return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err) + } + + switch req.Request.(type) { + case *extProcPb.ProcessingRequest_RequestHeaders: + + if err := s.activator.MayActivate(ctx); err != nil { + if logger.V(logutil.DEBUG).Enabled() { + logger.V(logutil.DEBUG).Error(err, "Failed to process request", "request", req) + } else { + logger.V(logutil.DEFAULT).Error(err, "Failed to process request") + } + resp, err := buildErrResponse(err) + if err != nil { + return err + } + if err := srv.Send(resp); err != nil { + logger.V(logutil.DEFAULT).Error(err, "Send failed") + return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) + } + return nil + } + + loggerTrace.Info("Sending request header response") + if err := srv.Send(continueHeadersResponse); err != nil { + logger.V(logutil.DEFAULT).Error(err, "error sending response") + return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) + } + + case *extProcPb.ProcessingRequest_RequestBody: + logger.V(logutil.DEBUG).Info("Error: ProcessingRequest_RequestBody received") + case *extProcPb.ProcessingRequest_RequestTrailers: + logger.V(logutil.DEBUG).Info("Error: ProcessingRequest_RequestTrailers received") + case *extProcPb.ProcessingRequest_ResponseHeaders: + logger.V(logutil.DEBUG).Info("Error: ProcessingRequest_ResponseHeaders received") + case *extProcPb.ProcessingRequest_ResponseBody: + logger.V(logutil.DEBUG).Info("Error: ProcessingRequest_ResponseBody received") + case *extProcPb.ProcessingRequest_ResponseTrailers: + logger.V(logutil.DEBUG).Info("Error: ProcessingRequest_ResponseTrailers received") + } + + } +} + +func buildErrResponse(err error) (*extProcPb.ProcessingResponse, error) { + var code envoyTypePb.StatusCode + + switch errutil.CanonicalCode(err) { + // This code can be returned by scheduler when there is no capacity for sheddable + // requests. + case errutil.InferencePoolResourceExhausted: + code = envoyTypePb.StatusCode_TooManyRequests + // This code can be returned by when EPP processes the request and run into server-side errors. + case errutil.Internal: + code = envoyTypePb.StatusCode_InternalServerError + // This code can be returned by the director when there are no candidate pods for the request scheduling. + case errutil.ServiceUnavailable: + code = envoyTypePb.StatusCode_ServiceUnavailable + // This code can be returned when users provide invalid json request. + case errutil.BadRequest: + code = envoyTypePb.StatusCode_BadRequest + case errutil.BadConfiguration: + code = envoyTypePb.StatusCode_NotFound + default: + return nil, status.Errorf(status.Code(err), "failed to handle request: %v", err) + } + + resp := &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: code, + }, + }, + }, + } + + if err.Error() != "" { + resp.Response.(*extProcPb.ProcessingResponse_ImmediateResponse).ImmediateResponse.Body = []byte(err.Error()) + } + + return resp, nil +} diff --git a/pkg/activator/requestcontrol/activator.go b/pkg/activator/requestcontrol/activator.go new file mode 100644 index 00000000..0e75a516 --- /dev/null +++ b/pkg/activator/requestcontrol/activator.go @@ -0,0 +1,298 @@ +// Package requestcontrol implements the activator logic for controlling request flow +package requestcontrol + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-logr/logr" + + autoscaling "k8s.io/api/autoscaling/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + cached "k8s.io/client-go/discovery/cached" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/scale" + + "sigs.k8s.io/controller-runtime/pkg/log" + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" +) + +const ( + objectAPIVersionKey = "activator.llm-d.ai/target-apiversion" + objectkindKey = "activator.llm-d.ai/target-kind" + objectNameKey = "activator.llm-d.ai/target-name" + scaleFromZeroGracePeriodKey = "activator.llm-d.ai/scale-from-zero-grace-period" // Optional annotation + + // DefaultScaleFromZeroGracePeriod is the time we will wait for a scale-from-zero decision to complete + DefaultScaleFromZeroGracePeriod = 60 * time.Second + + // DefaultScaleDownDelay is the amount of time that must pass before a scale-down decision is applied + DefaultScaleDownDelay = 120 * time.Second + + // ScaleToZeroRequestRetentionPeriod it is the amount of time we will wait before releasing the request after a scale from zero event + ScaleToZeroRequestRetentionPeriod = 5 * time.Second +) + +type scaledObjectData struct { + name string + scaleGracePeriod time.Duration + numReplicas int32 + scaleObject *autoscaling.Scale +} + +// Activator implements the logic for activating replicas based on incoming requests. +type Activator struct { + DynamicClient *dynamic.DynamicClient + ScaleClient scale.ScalesGetter + Mapper meta.RESTMapper + datastore datastore.Datastore + + scalingUp bool + guard chan struct{} + scalingUpAndGuardMu sync.Mutex +} + +// NewActivatorWithConfig creates a new Activator with the provided REST config and Datastore. +func NewActivatorWithConfig(config *rest.Config, datastore datastore.Datastore) (*Activator, error) { + scaleClient, mapper, err := initScaleClient(config) + if err != nil { + return nil, err + } + + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + return nil, err + } + + return &Activator{ + datastore: datastore, + DynamicClient: dynamicClient, + Mapper: mapper, + ScaleClient: scaleClient}, nil +} + +// MayActivate checks if the inferencePool associated with the request is scaled to one or more replicas +func (a *Activator) MayActivate(ctx context.Context) error { + logger := log.FromContext(ctx) + + // Get InferencePool Info + pool, err := a.datastore.PoolGet() + if err != nil { + return err + } + + logger.V(logutil.TRACE).Info("InferencePool found", "name", pool.Name, "namespace", pool.Namespace) + + // First: check if the inferencePool is currently scaling up from zero replicas + if scalingUp, guard := a.isScalingUp(); scalingUp { + logger.V(logutil.DEBUG).Info("InferencePool is currently scaling up. Waiting for it to be done.") + + a.waitOnGuard(guard, DefaultScaleFromZeroGracePeriod) + return nil // After scaling up is done, allow the request to proceed even if scaling failed + } + + // Then: block until the inferencePool has enough replicas and is ready + if ready := a.InferencePoolReady(ctx, pool); !ready { + return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find active candidate pods in the inferencePool for serving the request"} + } + + // Reset the Deactivator ticker for scale to zero monitoring + scaleDownDelay := DefaultScaleDownDelay + if value, found := getOptionalPoolAnnotation(logger, scaleDownDelayKey, pool); found { + scaleDownDelay, _ = time.ParseDuration(value) + } + + a.datastore.ResetTicker(scaleDownDelay) + return nil +} + +// InferencePoolReady checks if the inferencePool has enough replicas and is ready +func (a *Activator) InferencePoolReady(ctx context.Context, pool *v1.InferencePool) bool { + logger := log.FromContext(ctx) + namespace := pool.Namespace + + // TODO: store annotation values in datastore to avoid reading them every time + // verify required inferencePool annotations + valid := verifyPoolObjectAnnotations(logger, pool) + if !valid { + return false + } + + // extract optional inferencePool annotation if it exists, otherwise use a default value + scaleGracePeriod := DefaultScaleFromZeroGracePeriod + if value, found := getOptionalPoolAnnotation(logger, scaleFromZeroGracePeriodKey, pool); found { + scaleGracePeriod, _ = time.ParseDuration(value) + } + + // Get the scale subresource for the target inferencePool object + gvr, err := GetResourceForKind(a.Mapper, pool.Annotations[objectAPIVersionKey], pool.Annotations[objectkindKey]) + if err != nil { + msg := "Failed to parse Group, Version, Kind, Resource" + logger.Error(err, msg, "apiVersion", pool.Annotations[objectAPIVersionKey], "kind", pool.Annotations[objectkindKey]) + return false + } + + gr := gvr.GroupResource() + scaleObject, err := a.ScaleClient.Scales(namespace).Get(ctx, gr, pool.Annotations[objectNameKey], metav1.GetOptions{}) + if err != nil { + logger.Error(err, "Error getting scale subresource object") + return true + } + + // Common case: enough replicas? + if scaleObject.Spec.Replicas > 0 { + if a.inferencePoolPodsReady(logger, namespace, pool.Annotations[objectNameKey], scaleObject.Spec.Replicas, scaleGracePeriod, gvr) { + // Scale object exists and has no zero running replicas then do not scale it + logger.V(logutil.DEBUG).Info(fmt.Sprintf("Scale Object %s have at least one replica ready. Skipping scaling from zero", scaleObject.Name)) + return true + } + } + + // Need to scale inferencePool workload from zero to one replicas + numReplicas := int32(1) + scaleData := scaledObjectData{name: pool.Annotations[objectNameKey], scaleGracePeriod: DefaultScaleFromZeroGracePeriod, numReplicas: numReplicas, scaleObject: scaleObject} + return a.scaleInferencePool(ctx, logger, namespace, scaleData, gr, gvr) +} + +func (a *Activator) inferencePoolPodsReady(logger logr.Logger, namespace, objname string, numReplicas int32, scaleGracePeriod time.Duration, gvr schema.GroupVersionResource) bool { + // Don't inherit the parent context to avoid cancellation + err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, scaleGracePeriod, false, func(ctx context.Context) (done bool, err error) { + + a.datastore.ResetTicker(DefaultScaleDownDelay) // turn off the deactivator during scale from zero events + + unstructuredObj, err := a.DynamicClient.Resource(gvr).Namespace(namespace).Get(ctx, objname, metav1.GetOptions{}) + if err != nil { + logger.Error(err, "Error getting unstructured object") + return false, nil // continue polling + } + + // NOTE: this assumes that the target object has a status.readyReplicas field + if readyReplicas, found, err := unstructured.NestedFieldNoCopy(unstructuredObj.Object, "status", "readyReplicas"); found { + if numReplicas == int32(readyReplicas.(int64)) { + logger.V(logutil.DEBUG).Info("Candidate pods are READY") + return true, nil + } + logger.V(logutil.DEBUG).Info("Candidate pods are NOT READY") + } else if err != nil { + logger.Error(err, "Error getting readyReplicas - candidate pods for serving the request are NOT READY") + } else { + logger.V(logutil.DEBUG).Info("Object status.readyReplicas field is not set yet - candidate pods for serving the request are NOT READY") + return false, nil + } + + return false, nil + }) + + return err == nil +} + +func (a *Activator) scaleInferencePool(ctx context.Context, logger logr.Logger, namespace string, objData scaledObjectData, gr schema.GroupResource, gvr schema.GroupVersionResource) bool { + a.beginScalingUp() + defer a.endScalingUp() + + // Modify the desired replicas + objData.scaleObject.Spec.Replicas = objData.numReplicas + + // Update the Scale object + _, err := a.ScaleClient.Scales(namespace).Update(ctx, gr, objData.scaleObject, metav1.UpdateOptions{}) + if err != nil { + logger.Error(err, "Error increasing Scale Object number of replicas to one") + return false + } + logger.Info(fmt.Sprintf("Scale Object %s in namespace %s scaled up to %d replicas with scale grace period %d \n", objData.name, namespace, objData.numReplicas, int(objData.scaleGracePeriod))) + + // Wait for the pods to be ready + ready := a.inferencePoolPodsReady(logger, namespace, objData.name, objData.numReplicas, objData.scaleGracePeriod, gvr) + if ready { + // Give some time for the Endpoint Picker to pick up the newly created pods + time.Sleep(ScaleToZeroRequestRetentionPeriod) + return true + } + return false +} + +func initScaleClient(config *rest.Config) (scale.ScalesGetter, meta.RESTMapper, error) { + clientset, err := discovery.NewDiscoveryClientForConfig(config) + if err != nil { + return nil, nil, err + } + + cachedDiscoveryClient := cached.NewMemCacheClient(clientset) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient) + + return scale.New( + clientset.RESTClient(), restMapper, + dynamic.LegacyAPIPathResolverFunc, + scale.NewDiscoveryScaleKindResolver(clientset), + ), restMapper, nil +} + +func verifyPoolObjectAnnotations(logger logr.Logger, pool *v1.InferencePool) bool { + if _, ok := pool.Annotations[objectAPIVersionKey]; !ok { + logger.Info(fmt.Sprintf("Annotation '%s' not found on pool '%s'", objectAPIVersionKey, pool.Name)) + return false + } + if _, ok := pool.Annotations[objectkindKey]; !ok { + logger.Info(fmt.Sprintf("Annotation '%s' not found on pool '%s'", objectkindKey, pool.Name)) + return false + } + if _, ok := pool.Annotations[objectNameKey]; !ok { + logger.Info(fmt.Sprintf("Annotation '%s' not found on pool '%s'", objectNameKey, pool.Name)) + return false + } + return true +} + +func getOptionalPoolAnnotation(logger logr.Logger, annotationKey string, pool *v1.InferencePool) (string, bool) { + if value, ok := pool.Annotations[annotationKey]; ok { + return value, true + } + logger.V(logutil.DEBUG).Info(fmt.Sprintf("Annotation '%s' not found on pool '%s'", annotationKey, pool.Name)) + return "", false +} + +func (a *Activator) beginScalingUp() { + a.scalingUpAndGuardMu.Lock() + defer a.scalingUpAndGuardMu.Unlock() + + a.scalingUp = true + a.guard = make(chan struct{}) +} + +func (a *Activator) endScalingUp() { + a.scalingUpAndGuardMu.Lock() + defer a.scalingUpAndGuardMu.Unlock() + + a.scalingUp = false + close(a.guard) + a.guard = nil +} + +func (a *Activator) isScalingUp() (bool, chan struct{}) { + a.scalingUpAndGuardMu.Lock() + defer a.scalingUpAndGuardMu.Unlock() + + return a.scalingUp, a.guard +} + +// waitOnGuard blocks until the InferencePool is marked as ready or the timeout is reached. +func (a *Activator) waitOnGuard(guard <-chan struct{}, timeout time.Duration) { + select { + case <-time.After(timeout): + return + case <-guard: + } +} diff --git a/pkg/activator/requestcontrol/deactivator.go b/pkg/activator/requestcontrol/deactivator.go new file mode 100644 index 00000000..f270e0ef --- /dev/null +++ b/pkg/activator/requestcontrol/deactivator.go @@ -0,0 +1,110 @@ +package requestcontrol + +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" + + "k8s.io/client-go/scale" +) + +const ( + scaleDownDelayKey = "activator.llm-d.ai/scale-down-delay" // Optional annotation +) + +// Deactivator handles scaling down InferencePools to zero replicas after a period of idleness. +type Deactivator struct { + DynamicClient *dynamic.DynamicClient + ScaleClient scale.ScalesGetter + Mapper meta.RESTMapper + datastore datastore.Datastore +} + +// DeactivatorWithConfig creates a new Deactivator with the provided REST config and Datastore. +func DeactivatorWithConfig(config *rest.Config, datastore datastore.Datastore) (*Deactivator, error) { + scaleClient, mapper, err := initScaleClient(config) + if err != nil { + return nil, err + } + + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + return nil, err + } + + return &Deactivator{ + datastore: datastore, + DynamicClient: dynamicClient, + Mapper: mapper, + ScaleClient: scaleClient}, nil +} + +// MonitorInferencePoolIdleness monitors the InferencePool for idleness and scales it down to zero replicas +func (da *Deactivator) MonitorInferencePoolIdleness(ctx context.Context) { + logger := log.FromContext(ctx) + ds := da.datastore + + // The deactivator uses a ticker to periodically try and scale down the InferencePool after idleness. + // Upon each received requests, the ticker is reset to delay the next scale-down check. + ds.ResetTicker(DefaultScaleDownDelay) + defer ds.StopTicker() + + tickerCh := ds.GetTickerCh() + + for { + select { + case <-ctx.Done(): + logger.Info("Context cancelled, stopping deactivator") + return + case <-tickerCh: + logger.V(logutil.DEBUG).Info("Deactivator time check for inferencePool idleness: " + time.Now().Format("15:04:05")) + + // Get InferencePool Info + pool, err := ds.PoolGet() + if err != nil { + logger.V(logutil.TRACE).Info("InferencePool found", "name", pool.Name, "namespace", pool.Namespace) + continue + } + + // Verify required inferencePool annotations + valid := verifyPoolObjectAnnotations(logger, pool) + if !valid { + logger.V(logutil.TRACE).Info("InferencePool missing required annotations for pool", "name", pool.Name, "namespace", pool.Namespace) + continue + } + + gvr, err := GetResourceForKind(da.Mapper, pool.Annotations[objectAPIVersionKey], pool.Annotations[objectkindKey]) + if err != nil { + logger.Error(err, "Failed to parse Group, Version, Kind, Resource", "apiVersion", pool.Annotations[objectAPIVersionKey], "kind", pool.Annotations[objectkindKey]) + continue + } + + gr := gvr.GroupResource() + + scaleObject, err := da.ScaleClient.Scales(pool.Namespace).Get(ctx, gr, pool.Annotations[objectNameKey], metav1.GetOptions{}) + if err != nil { + logger.Error(err, "Error getting scale subresource object") + continue + } + + // Scale inferencePool to zero replicas + scaleObject.Spec.Replicas = 0 + _, err = da.ScaleClient.Scales(pool.Namespace).Update(ctx, gr, scaleObject, metav1.UpdateOptions{}) + if err != nil { + logger.Error(err, "InferencePool was not successfully scale down to zero replica") + continue + } + + logger.V(logutil.DEBUG).Info(fmt.Sprintf("InferencePool '%s' was successfully scale down to zero replica", pool.Name)) + } + } +} diff --git a/pkg/activator/requestcontrol/gvr.go b/pkg/activator/requestcontrol/gvr.go new file mode 100644 index 00000000..7f8aa131 --- /dev/null +++ b/pkg/activator/requestcontrol/gvr.go @@ -0,0 +1,39 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package requestcontrol + +import ( + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// GetResourceForKind returns GroupVersionResource for specified apiVersion (groupVersion) and Kind +func GetResourceForKind(mapper meta.RESTMapper, apiVersion string, kind string) (schema.GroupVersionResource, error) { + gv, err := schema.ParseGroupVersion(apiVersion) + if err != nil { + return schema.GroupVersionResource{}, err + } + + // Get the REST mapping for the GroupKind + gk := schema.GroupKind{Group: gv.Group, Kind: kind} + mapping, err := mapper.RESTMapping(gk, gv.Version) + if err != nil { + return schema.GroupVersionResource{}, err + } + + return mapping.Resource, nil +} diff --git a/pkg/activator/runnable/grpc.go b/pkg/activator/runnable/grpc.go new file mode 100644 index 00000000..888c0539 --- /dev/null +++ b/pkg/activator/runnable/grpc.go @@ -0,0 +1,67 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package runnable provides utilities to convert gRPC servers into controller-runtime Runnables. +package runnable + +import ( + "context" + "fmt" + "net" + + "google.golang.org/grpc" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +// GRPCServer converts the given gRPC server into a runnable. +// The server name is just being used for logging. +func GRPCServer(name string, srv *grpc.Server, port int) manager.Runnable { + return manager.RunnableFunc(func(ctx context.Context) error { + // Use "name" key as that is what manager.Server does as well. + log := ctrl.Log.WithValues("name", name) + log.Info("gRPC server starting") + + // Start listening. + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return fmt.Errorf("gRPC server failed to listen - %w", err) + } + + log.Info("gRPC server listening", "port", port) + + // Shutdown on context closed. + // Terminate the server on context closed. + // Make sure the goroutine does not leak. + doneCh := make(chan struct{}) + defer close(doneCh) + go func() { + select { + case <-ctx.Done(): + log.Info("gRPC server shutting down") + srv.GracefulStop() + case <-doneCh: + } + }() + + // Keep serving until terminated. + if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped { + return fmt.Errorf("gRPC server failed - %w", err) + } + log.Info("gRPC server terminated") + return nil + }) +} diff --git a/pkg/activator/runnable/leader_election.go b/pkg/activator/runnable/leader_election.go new file mode 100644 index 00000000..3e2230d9 --- /dev/null +++ b/pkg/activator/runnable/leader_election.go @@ -0,0 +1,31 @@ +package runnable + +import "sigs.k8s.io/controller-runtime/pkg/manager" + +type leaderElection struct { + manager.Runnable + needsLeaderElection bool +} + +// LeaderElection wraps the given runnable to implement manager.LeaderElectionRunnable. +func LeaderElection(runnable manager.Runnable, needsLeaderElection bool) manager.Runnable { + return &leaderElection{ + Runnable: runnable, + needsLeaderElection: needsLeaderElection, + } +} + +// RequireLeaderElection wraps the given runnable, marking it as requiring leader election. +func RequireLeaderElection(runnable manager.Runnable) manager.Runnable { + return LeaderElection(runnable, true) +} + +// NoLeaderElection wraps the given runnable, marking it as not requiring leader election. +func NoLeaderElection(runnable manager.Runnable) manager.Runnable { + return LeaderElection(runnable, false) +} + +// NeedLeaderElection implements manager.NeedLeaderElection interface. +func (r *leaderElection) NeedLeaderElection() bool { + return r.needsLeaderElection +} diff --git a/pkg/activator/server/controller_manager.go b/pkg/activator/server/controller_manager.go new file mode 100644 index 00000000..e40eb22d --- /dev/null +++ b/pkg/activator/server/controller_manager.go @@ -0,0 +1,114 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package server provides utilities to create and manage the controller manager for the activator component. +package server + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" +) + +var scheme = runtime.NewScheme() + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(v1alpha2.Install(scheme)) + utilruntime.Must(v1.Install(scheme)) +} + +// defaultManagerOptions returns the default options used to create the manager. +func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver.Options) (ctrl.Options, error) { + opt := ctrl.Options{ + Scheme: scheme, + Cache: cache.Options{ + ByObject: map[client.Object]cache.ByObject{ + &corev1.Pod{}: { + Namespaces: map[string]cache.Config{ + gknn.Namespace: {}, + }, + }, + }, + }, + Metrics: metricsServerOptions, + } + switch gknn.Group { + case v1alpha2.GroupName: + opt.Cache.ByObject[&v1alpha2.InferencePool{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": gknn.Name, + })}}, + } + case v1.GroupName: + opt.Cache.ByObject[&v1.InferencePool{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": gknn.Name, + })}}, + } + default: + return ctrl.Options{}, fmt.Errorf("unknown group: %s", gknn.Group) + } + + return opt, nil +} + +// NewDefaultManager creates a new controller manager with default configuration. +func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) { + opt, err := defaultManagerOptions(gknn, metricsServerOptions) + if err != nil { + return nil, fmt.Errorf("failed to create controller manager options: %v", err) + } + + if leaderElectionEnabled { + opt.LeaderElection = true + opt.LeaderElectionResourceLock = "leases" + // The lease name needs to be unique per activator deployment. + opt.LeaderElectionID = fmt.Sprintf("activator-%s-%s.llm-d.ai", gknn.Namespace, gknn.Name) + opt.LeaderElectionNamespace = gknn.Namespace + opt.LeaderElectionReleaseOnCancel = true + } + + manager, err := ctrl.NewManager(restConfig, opt) + + if err != nil { + return nil, fmt.Errorf("failed to create controller manager: %v", err) + } + return manager, nil +} + +// NewManagerWithOptions creates a new controller manager with injectable options. +func NewManagerWithOptions(restConfig *rest.Config, opts manager.Options) (ctrl.Manager, error) { + manager, err := ctrl.NewManager(restConfig, opts) + if err != nil { + return nil, fmt.Errorf("failed to create controller manager: %v", err) + } + return manager, nil +} diff --git a/pkg/activator/server/runserver.go b/pkg/activator/server/runserver.go new file mode 100644 index 00000000..ba0b9b85 --- /dev/null +++ b/pkg/activator/server/runserver.go @@ -0,0 +1,156 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "context" + "crypto/tls" + "fmt" + "time" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/go-logr/logr" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + healthgrpc "google.golang.org/grpc/health/grpc_health_v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/controller" + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/datastore" + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/handlers" + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/requestcontrol" + "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/runnable" + tlsutil "github.com/llm-d/llm-d-inference-scheduler/pkg/activator/tls" +) + +// ExtProcServerRunner provides methods to manage an external process server. +type ExtProcServerRunner struct { + GrpcPort int + PoolNamespacedName types.NamespacedName + PoolGKNN common.GKNN + Datastore datastore.Datastore + SecureServing bool + HealthChecking bool + CertPath string + RefreshPrometheusMetricsInterval time.Duration + MetricsStalenessThreshold time.Duration + Activator *requestcontrol.Activator +} + +// Default values for CLI flags in main +const ( + DefaultGrpcPort = 9002 // default for --grpc-port + DefaultGrpcHealthPort = 9003 // default for --grpc-health-port + DefaultMetricsPort = 9090 // default for --metrics-port + DefaultPoolName = "" // required but no default + DefaultPoolNamespace = "default" // default for --pool-namespace + DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refresh-metrics-interval + DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refresh-prometheus-metrics-interval + DefaultSecureServing = true // default for --secure-serving + DefaultHealthChecking = false // default for --health-checking + DefaultEnablePprof = true // default for --enable-pprof + DefaultCertPath = "" // default for --cert-path + DefaultPoolGroup = "inference.networking.k8s.io" // default for --pool-group + DefaultMetricsStalenessThreshold = 2 * time.Second +) + +// NewDefaultExtProcServerRunner creates a runner with default values. +// Note: Dependencies like Datastore, Scheduler, SD need to be set separately. +func NewDefaultExtProcServerRunner() *ExtProcServerRunner { + poolGKNN := common.GKNN{ + NamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace}, + GroupKind: schema.GroupKind{ + Group: DefaultPoolGroup, + Kind: "InferencePool", + }, + } + return &ExtProcServerRunner{ + GrpcPort: DefaultGrpcPort, + PoolNamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace}, + PoolGKNN: poolGKNN, + SecureServing: DefaultSecureServing, + HealthChecking: DefaultHealthChecking, + RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, + MetricsStalenessThreshold: DefaultMetricsStalenessThreshold, + // Dependencies can be assigned later. + } +} + +// SetupWithManager sets up the runner with the given manager. +func (r *ExtProcServerRunner) SetupWithManager(_ context.Context, mgr ctrl.Manager) error { + // Create the controllers and register them with the manager + if err := (&controller.InferencePoolReconciler{ + Datastore: r.Datastore, + Reader: mgr.GetClient(), + PoolGKNN: r.PoolGKNN, + }).SetupWithManager(mgr); err != nil { + return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err) + } + + return nil +} + +// AsRunnable returns a Runnable that can be used to start the ext-proc gRPC server. +// The runnable implements LeaderElectionRunnable with leader election disabled. +func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable { + return runnable.NoLeaderElection(manager.RunnableFunc(func(ctx context.Context) error { + + var srv *grpc.Server + if r.SecureServing { + var cert tls.Certificate + var err error + if r.CertPath != "" { + cert, err = tls.LoadX509KeyPair(r.CertPath+"/tls.crt", r.CertPath+"/tls.key") + } else { + // Create tls based credential. + cert, err = tlsutil.CreateSelfSignedTLSCertificate(logger) + } + if err != nil { + return fmt.Errorf("failed to create self signed certificate - %w", err) + } + + creds := credentials.NewTLS(&tls.Config{ + Certificates: []tls.Certificate{cert}, + }) + // Init the server. + srv = grpc.NewServer(grpc.Creds(creds)) + } else { + srv = grpc.NewServer() + } + + extProcServer := handlers.NewStreamingServer(r.Datastore, r.Activator) + extProcPb.RegisterExternalProcessorServer(srv, extProcServer) + + if r.HealthChecking { + healthcheck := health.NewServer() + healthgrpc.RegisterHealthServer(srv, + healthcheck, + ) + svcName := extProcPb.ExternalProcessor_ServiceDesc.ServiceName + logger.Info("Setting ExternalProcessor service status to SERVING", "serviceName", svcName) + healthcheck.SetServingStatus(svcName, healthgrpc.HealthCheckResponse_SERVING) + } + + // Forward to the gRPC runnable. + return runnable.GRPCServer("ext-proc", srv, r.GrpcPort).Start(ctx) + })) +} diff --git a/pkg/activator/tls/tls.go b/pkg/activator/tls/tls.go new file mode 100644 index 00000000..ba9d20b9 --- /dev/null +++ b/pkg/activator/tls/tls.go @@ -0,0 +1,74 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package tls provides utilities for TLS configuration. +package tls + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "fmt" + "math/big" + "time" + + "github.com/go-logr/logr" +) + +// CreateSelfSignedTLSCertificate creates a self-signed cert the server can use to serve TLS. +func CreateSelfSignedTLSCertificate(_ logr.Logger) (tls.Certificate, error) { + serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) + serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) + if err != nil { + return tls.Certificate{}, fmt.Errorf("error creating serial number: %v", err) + } + now := time.Now() + notBefore := now.UTC() + template := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + Organization: []string{"Inference Ext"}, + }, + NotBefore: notBefore, + NotAfter: now.Add(time.Hour * 24 * 365 * 10).UTC(), // 10 years + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + } + + priv, err := rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + return tls.Certificate{}, fmt.Errorf("error generating key: %v", err) + } + + derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv) + if err != nil { + return tls.Certificate{}, fmt.Errorf("error creating certificate: %v", err) + } + + certBytes := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + + privBytes, err := x509.MarshalPKCS8PrivateKey(priv) + if err != nil { + return tls.Certificate{}, fmt.Errorf("error marshalling private key: %v", err) + } + keyBytes := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes}) + + return tls.X509KeyPair(certBytes, keyBytes) +}