From 036c134c57eb069c589dd71d419e08bf2905c08f Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 4 Nov 2025 18:32:23 +0800 Subject: [PATCH 1/5] support function with replace sink and source --- api/compute/v1alpha1/function_types.go | 51 +++++- api/compute/v1alpha1/zz_generated.deepcopy.go | 48 ++++++ ...ompute.functionmesh.io-functionmeshes.yaml | 28 +++- ...ompute.functionmesh.io_functionmeshes.yaml | 30 ++++ .../compute.functionmesh.io_functions.yaml | 30 ++++ controllers/spec/common.go | 154 ++++++++---------- controllers/spec/function.go | 6 + controllers/spec/function_test.go | 84 ++++++++++ controllers/spec/sink.go | 4 +- controllers/spec/source.go | 4 +- controllers/spec/utils.go | 140 +++++++++++++++- controllers/spec/utils_test.go | 45 +++++ manifests/crd.yaml | 56 ++++++- pkg/webhook/function_webhook.go | 5 +- pkg/webhook/sink_webhook.go | 2 +- pkg/webhook/source_webhook.go | 2 +- pkg/webhook/validate.go | 47 +++--- 17 files changed, 616 insertions(+), 120 deletions(-) diff --git a/api/compute/v1alpha1/function_types.go b/api/compute/v1alpha1/function_types.go index a513d6202..192869344 100644 --- a/api/compute/v1alpha1/function_types.go +++ b/api/compute/v1alpha1/function_types.go @@ -62,7 +62,11 @@ type FunctionSpec struct { FilebeatImage string `json:"filebeatImage,omitempty"` // +kubebuilder:validation:Optional // +kubebuilder:pruning:PreserveUnknownFields - FuncConfig *Config `json:"funcConfig,omitempty"` + FuncConfig *Config `json:"funcConfig,omitempty"` + // +kubebuilder:validation:Optional + SourceConfig *SourceConnectorSpec `json:"sourceConfig,omitempty"` + // +kubebuilder:validation:Optional + SinkConfig *SinkConnectorSpec `json:"sinkConfig,omitempty"` Resources corev1.ResourceRequirements `json:"resources,omitempty"` SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` @@ -152,6 +156,51 @@ type FunctionList struct { Items []Function `json:"items"` } +// SourceConnectorSpec describes configurable fields when a function overrides its source implementation. +type SourceConnectorSpec struct { + // Archive points to a nar archive containing the connector. It can reference built-in connectors using the + // builtin:// scheme. + // +kubebuilder:validation:Optional + Archive string `json:"archive,omitempty"` + // Builtin holds the resolved name of a built-in source connector. + // +kubebuilder:validation:Optional + Builtin string `json:"builtin,omitempty"` + // ClassName is the fully qualified source implementation class. + // +kubebuilder:validation:Optional + ClassName string `json:"className,omitempty"` + // TypeClassName overrides the expected message type. + // +kubebuilder:validation:Optional + TypeClassName string `json:"typeClassName,omitempty"` + // Configs contains connector specific options. + // +kubebuilder:validation:Optional + // +kubebuilder:pruning:PreserveUnknownFields + Configs *Config `json:"configs,omitempty"` +} + +// SinkConnectorSpec describes configurable fields when a function overrides its sink implementation. +type SinkConnectorSpec struct { + // Archive points to a nar archive containing the connector. It can reference built-in connectors using the + // builtin:// scheme. + // +kubebuilder:validation:Optional + Archive string `json:"archive,omitempty"` + // Builtin holds the resolved name of a built-in sink connector. + // +kubebuilder:validation:Optional + Builtin string `json:"builtin,omitempty"` + // SinkType refers to the built-in sink identifier when using connectors packaged with Pulsar. + // +kubebuilder:validation:Optional + SinkType string `json:"sinkType,omitempty"` + // ClassName is the fully qualified sink implementation class. + // +kubebuilder:validation:Optional + ClassName string `json:"className,omitempty"` + // TypeClassName overrides the message type for the sink. + // +kubebuilder:validation:Optional + TypeClassName string `json:"typeClassName,omitempty"` + // Configs contains connector specific options. + // +kubebuilder:validation:Optional + // +kubebuilder:pruning:PreserveUnknownFields + Configs *Config `json:"configs,omitempty"` +} + func init() { SchemeBuilder.Register(&Function{}, &FunctionList{}) } diff --git a/api/compute/v1alpha1/zz_generated.deepcopy.go b/api/compute/v1alpha1/zz_generated.deepcopy.go index 5695fce22..0d678581e 100644 --- a/api/compute/v1alpha1/zz_generated.deepcopy.go +++ b/api/compute/v1alpha1/zz_generated.deepcopy.go @@ -489,6 +489,16 @@ func (in *FunctionSpec) DeepCopyInto(out *FunctionSpec) { in, out := &in.FuncConfig, &out.FuncConfig *out = (*in).DeepCopy() } + if in.SourceConfig != nil { + in, out := &in.SourceConfig, &out.SourceConfig + *out = new(SourceConnectorSpec) + (*in).DeepCopyInto(*out) + } + if in.SinkConfig != nil { + in, out := &in.SinkConfig, &out.SinkConfig + *out = new(SinkConnectorSpec) + (*in).DeepCopyInto(*out) + } in.Resources.DeepCopyInto(&out.Resources) if in.SecretsMap != nil { in, out := &in.SecretsMap, &out.SecretsMap @@ -1155,6 +1165,25 @@ func (in *Sink) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SinkConnectorSpec) DeepCopyInto(out *SinkConnectorSpec) { + *out = *in + if in.Configs != nil { + in, out := &in.Configs, &out.Configs + *out = (*in).DeepCopy() + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SinkConnectorSpec. +func (in *SinkConnectorSpec) DeepCopy() *SinkConnectorSpec { + if in == nil { + return nil + } + out := new(SinkConnectorSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SinkList) DeepCopyInto(out *SinkList) { *out = *in @@ -1299,6 +1328,25 @@ func (in *Source) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SourceConnectorSpec) DeepCopyInto(out *SourceConnectorSpec) { + *out = *in + if in.Configs != nil { + in, out := &in.Configs, &out.Configs + *out = (*in).DeepCopy() + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SourceConnectorSpec. +func (in *SourceConnectorSpec) DeepCopy() *SourceConnectorSpec { + if in == nil { + return nil + } + out := new(SourceConnectorSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SourceList) DeepCopyInto(out *SourceList) { *out = *in diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml index f2c8eb4b5..995650742 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml @@ -7590,8 +7590,21 @@ spec: showPreciseParallelism: type: boolean sinkConfig: + properties: + archive: + type: string + builtin: + type: string + className: + type: string + configs: + additionalProperties: {} + type: object + sinkType: + type: string + typeClassName: + type: string type: object - x-kubernetes-preserve-unknown-fields: true sinkType: type: string statefulConfig: @@ -11246,8 +11259,19 @@ spec: showPreciseParallelism: type: boolean sourceConfig: + properties: + archive: + type: string + builtin: + type: string + className: + type: string + configs: + additionalProperties: {} + type: object + typeClassName: + type: string type: object - x-kubernetes-preserve-unknown-fields: true sourceType: type: string statefulConfig: diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index d09dfed79..c8770700f 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -3710,8 +3710,38 @@ spec: type: object showPreciseParallelism: type: boolean + sinkConfig: + properties: + archive: + type: string + builtin: + type: string + className: + type: string + configs: + type: object + x-kubernetes-preserve-unknown-fields: true + sinkType: + type: string + typeClassName: + type: string + type: object skipToLatest: type: boolean + sourceConfig: + properties: + archive: + type: string + builtin: + type: string + className: + type: string + configs: + type: object + x-kubernetes-preserve-unknown-fields: true + typeClassName: + type: string + type: object statefulConfig: properties: pulsar: diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index 0d8232bf1..c971eb8bd 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -3707,8 +3707,38 @@ spec: type: object showPreciseParallelism: type: boolean + sinkConfig: + properties: + archive: + type: string + builtin: + type: string + className: + type: string + configs: + type: object + x-kubernetes-preserve-unknown-fields: true + sinkType: + type: string + typeClassName: + type: string + type: object skipToLatest: type: boolean + sourceConfig: + properties: + archive: + type: string + builtin: + type: string + className: + type: string + configs: + type: object + x-kubernetes-preserve-unknown-fields: true + typeClassName: + type: string + type: object statefulConfig: properties: pulsar: diff --git a/controllers/spec/common.go b/controllers/spec/common.go index f2e9144d8..eab364604 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -69,6 +69,7 @@ const ( DefaultGenericRunnerImage = DefaultRunnerPrefix + "pulsar-functions-generic-base-runner:" + DefaultGenericRunnerTag PulsarAdminExecutableFile = "/pulsar/bin/pulsar-admin" WorkDir = "/pulsar/" + DefaultConnectorsDirectory = WorkDir + "connectors" RunnerImageHasPulsarctl = "pulsar-functions-(pulsarctl|sn|generic)-(java|python|go|nodejs|base)-runner" @@ -169,25 +170,6 @@ type TLSConfig interface { GetMountPath() string } -type RollingCfg struct { - Enabled bool - Type string // "size" or "time" - File string - // size-based - MaxBytes int - Backups int - // time-based - When string // e.g. "D", "W0" - Interval int -} - -type LogCfg struct { - Level string // INFO/DEBUG/... - Format string // "json" or "text" - Handlers string // computed: "stream_handler[,rotating_file_handler|,timed_rotating_file_handler]" - Rolling RollingCfg -} - func IsManaged(object metav1.Object) bool { managed, exists := object.GetAnnotations()[AnnotationManaged] return !exists || managed != "false" @@ -533,14 +515,14 @@ func GenerateAffinity(affinity *corev1.Affinity, labels map[string]string, disab return affinity } -func MakeJavaFunctionCommand(downloadPath, packageFile, name, clusterName, generateLogConfigCommand, logLevel, details, extraDependenciesDir, uid string, +func MakeJavaFunctionCommand(downloadPath, packageFile, name, clusterName, generateLogConfigCommand, logLevel, details, extraDependenciesDir, connectorsDirectory, uid string, memory *resource.Quantity, javaOpts []string, hasPulsarctl, hasWget, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig, maxPendingAsyncRequests *int32, logConfigFileName string) []string { processCommand := setShardIDEnvironmentVariableCommand() + " && " + generateLogConfigCommand + strings.Join(getProcessJavaRuntimeArgs(name, packageFile, clusterName, logLevel, details, - extraDependenciesDir, uid, memory, javaOpts, authProvided, tlsProvided, secretMaps, state, tlsConfig, + extraDependenciesDir, connectorsDirectory, uid, memory, javaOpts, authProvided, tlsProvided, secretMaps, state, tlsConfig, authConfig, maxPendingAsyncRequests, logConfigFileName), " ") if downloadPath != "" && !utils.EnableInitContainers { // prepend download command if the downPath is provided @@ -1123,89 +1105,88 @@ func generatePythonLogConfigCommand(name string, runtime *v1alpha1.PythonRuntime func renderPythonInstanceLoggingINITemplate(name string, runtime *v1alpha1.PythonRuntime, agent v1alpha1.LogTopicAgent) (string, error) { tmpl := template.Must(template.New("spec").Parse(pythonLoggingINITemplate)) - - lc := LogCfg{ - Level: "INFO", - Format: "text", - Rolling: RollingCfg{ - Enabled: false, - Backups: 5, - }, + var tpl bytes.Buffer + type logConfig struct { + RollingEnabled bool + Level string + Policy template.HTML + Handlers string + Format string } - - // level + lc := &logConfig{} + lc.Level = "INFO" + lc.Format = "text" + lc.Handlers = "stream_handler" if runtime.Log != nil && runtime.Log.Level != "" { if level := parsePythonLogLevel(runtime); level != "" { lc.Level = level } } - // format - if runtime.Log != nil && runtime.Log.Format != nil && strings.ToLower(string(*runtime.Log.Format)) == "json" { - lc.Format = "json" + if runtime.Log != nil && runtime.Log.Format != nil { + lc.Format = string(*runtime.Log.Format) } - - // default handler - lc.Handlers = "stream_handler" - - // log file path - logFile := fmt.Sprintf("logs/functions/%s.log", name) - - // rolling policy if runtime.Log != nil && runtime.Log.RotatePolicy != nil { - lc.Rolling.Enabled = true + lc.RollingEnabled = true + logFile := fmt.Sprintf("logs/functions/%s-${%s}.log", name, EnvShardID) switch *runtime.Log.RotatePolicy { - case v1alpha1.SizedPolicyWith10MB: - lc.Rolling.Type = "size" - lc.Rolling.File = logFile - lc.Rolling.MaxBytes = 10 * 1024 * 1024 - lc.Handlers = "stream_handler,rotating_file_handler" - case v1alpha1.SizedPolicyWith50MB: - lc.Rolling.Type = "size" - lc.Rolling.File = logFile - lc.Rolling.MaxBytes = 50 * 1024 * 1024 - lc.Handlers = "stream_handler,rotating_file_handler" - case v1alpha1.SizedPolicyWith100MB: - lc.Rolling.Type = "size" - lc.Rolling.File = logFile - lc.Rolling.MaxBytes = 100 * 1024 * 1024 - lc.Handlers = "stream_handler,rotating_file_handler" case v1alpha1.TimedPolicyWithDaily: - lc.Rolling.Type = "time" - lc.Rolling.File = logFile - lc.Rolling.When = "D" - lc.Rolling.Interval = 1 lc.Handlers = "stream_handler,timed_rotating_file_handler" + lc.Policy = template.HTML(fmt.Sprintf(`[handler_timed_rotating_file_handler] +args=(\"%s\", 'D', 1, 5,) +class=handlers.TimedRotatingFileHandler +level=%s +formatter=formatter`, logFile, lc.Level)) case v1alpha1.TimedPolicyWithWeekly: - lc.Rolling.Type = "time" - lc.Rolling.File = logFile - lc.Rolling.When = "W0" // every monday - lc.Rolling.Interval = 1 lc.Handlers = "stream_handler,timed_rotating_file_handler" + lc.Policy = template.HTML(fmt.Sprintf(`[handler_timed_rotating_file_handler] +args=(\"%s\", 'W0', 1, 5,) +class=handlers.TimedRotatingFileHandler +level=%s +formatter=formatter`, logFile, lc.Level)) case v1alpha1.TimedPolicyWithMonthly: - lc.Rolling.Type = "time" - lc.Rolling.File = logFile - lc.Rolling.When = "D" // day - lc.Rolling.Interval = 30 lc.Handlers = "stream_handler,timed_rotating_file_handler" - } - } else if agent == v1alpha1.SIDECAR { - // sidecar mode enables rolling by default, using size policy with 10MB - lc.Rolling = RollingCfg{ - Enabled: true, - Type: "size", - File: logFile, - MaxBytes: 10 * 1024 * 1024, - Backups: 5, - } + lc.Policy = template.HTML(fmt.Sprintf(`[handler_timed_rotating_file_handler] +args=(\"%s\", 'D', 30, 5,) +class=handlers.TimedRotatingFileHandler +level=%s +formatter=formatter`, logFile, lc.Level)) + case v1alpha1.SizedPolicyWith10MB: + lc.Handlers = "stream_handler,rotating_file_handler" + lc.Policy = template.HTML(fmt.Sprintf(`[handler_rotating_file_handler] +args=(\"%s\", 'a', 10485760, 5,) +class=handlers.RotatingFileHandler +level=%s +formatter=formatter`, logFile, lc.Level)) + case v1alpha1.SizedPolicyWith50MB: + lc.Handlers = "handler_stream_handler,rotating_file_handler" + lc.Policy = template.HTML(fmt.Sprintf(`[handler_rotating_file_handler] +args=(%s, 'a', 52428800, 5,) +class=handlers.RotatingFileHandler +level=%s +formatter=formatter`, logFile, lc.Level)) + case v1alpha1.SizedPolicyWith100MB: + lc.Handlers = "handler_stream_handler,rotating_file_handler" + lc.Policy = template.HTML(fmt.Sprintf(`[handler_rotating_file_handler] +args=(%s, 'a', 104857600, 5,) +class=handlers.RotatingFileHandler +level=%s +formatter=formatter`, logFile, lc.Level)) + } + } else if agent == v1alpha1.SIDECAR { // sidecar mode needs the rotated log file + lc.RollingEnabled = true + logFile := fmt.Sprintf("logs/functions/%s-${%s}.log", name, EnvShardID) lc.Handlers = "stream_handler,rotating_file_handler" + lc.Policy = template.HTML(fmt.Sprintf(`[handler_rotating_file_handler] +args=(\"%s\", 'a', 10485760, 5,) +class=handlers.RotatingFileHandler +level=%s +formatter=formatter`, logFile, lc.Level)) } - - var buf bytes.Buffer - if err := tmpl.Execute(&buf, &lc); err != nil { + if err := tmpl.Execute(&tpl, lc); err != nil { log.Error(err, "failed to render python instance logging template") return "", err } - return buf.String(), nil + return tpl.String(), nil } func parseJavaLogLevel(runtime *v1alpha1.JavaRuntime) string { @@ -1282,7 +1263,7 @@ func setShardIDEnvironmentVariableCommand() string { return fmt.Sprintf("%s=${POD_NAME##*-} && echo shardId=${%s}", EnvShardID, EnvShardID) } -func getProcessJavaRuntimeArgs(name, packageName, clusterName, logLevel, details, extraDependenciesDir, uid string, +func getProcessJavaRuntimeArgs(name, packageName, clusterName, logLevel, details, extraDependenciesDir, connectorsDirectory, uid string, memory *resource.Quantity, javaOpts []string, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig, @@ -1332,6 +1313,9 @@ func getProcessJavaRuntimeArgs(name, packageName, clusterName, logLevel, details } sharedArgs := getSharedArgs(details, clusterName, uid, authProvided, tlsProvided, tlsConfig, authConfig) args = append(args, sharedArgs...) + if connectorsDirectory != "" { + args = append(args, "--connectors_directory", connectorsDirectory) + } if len(secretMaps) > 0 { secretProviderArgs := getJavaSecretProviderArgs(secretMaps) args = append(args, secretProviderArgs...) diff --git a/controllers/spec/function.go b/controllers/spec/function.go index 2b5a321c2..9eca5d0ca 100644 --- a/controllers/spec/function.go +++ b/controllers/spec/function.go @@ -223,6 +223,11 @@ func makeFunctionLabels(function *v1alpha1.Function) map[string]string { func makeFunctionCommand(function *v1alpha1.Function) []string { spec := function.Spec + connectorsDirectory := "" + if spec.SourceConfig != nil || spec.SinkConfig != nil { + connectorsDirectory = DefaultConnectorsDirectory + } + hasPulsarctl := function.Spec.ImageHasPulsarctl hasWget := function.Spec.ImageHasWget if match, _ := regexp.MatchString(RunnerImageHasPulsarctl, function.Spec.Image); match { @@ -238,6 +243,7 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { parseJavaLogLevel(spec.Java), generateFunctionDetailsInJSON(function), spec.Java.ExtraDependenciesDir, + connectorsDirectory, string(function.UID), spec.Resources.Limits.Memory(), spec.Java.JavaOpts, hasPulsarctl, hasWget, diff --git a/controllers/spec/function_test.go b/controllers/spec/function_test.go index 90711ebe8..30229442f 100644 --- a/controllers/spec/function_test.go +++ b/controllers/spec/function_test.go @@ -18,6 +18,8 @@ package spec import ( + "encoding/json" + "regexp" "strings" "testing" @@ -25,6 +27,9 @@ import ( corev1 "k8s.io/api/core/v1" "github.com/streamnative/function-mesh/api/compute/v1alpha1" + "github.com/streamnative/function-mesh/controllers/proto" + + "google.golang.org/protobuf/encoding/protojson" "gotest.tools/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -57,6 +62,7 @@ func makeFunctionSample(functionName string) *v1alpha1.Function { Name: functionName, ClassName: "org.apache.pulsar.functions.api.examples.ExclamationFunction", Tenant: "public", + Namespace: "default", ClusterName: TestClusterName, Input: v1alpha1.InputConf{ Topics: []string{ @@ -147,3 +153,81 @@ func TestInitContainerDownloader(t *testing.T) { assert.Assert(t, strings.Contains(startDownloadCommands, "/pulsar/download/java-function.jar"), "download command should contain /pulsar/download/java-function.jar: %s", startDownloadCommands) assert.Assert(t, strings.Contains(startFunctionCommands, "/pulsar/tmp/java-function.jar"), "function command should contain /pulsar/tmp/java-function.jar: %s", startFunctionCommands) } + +func TestJavaFunctionCommandWithConnectorOverrides(t *testing.T) { + function := makeFunctionSample("connector-test") + function.Spec.Input = v1alpha1.InputConf{} + function.Spec.Output = v1alpha1.OutputConf{} + function.Spec.LogTopic = "" + function.Spec.StateConfig = nil + + sourceConfigs := v1alpha1.NewConfig(map[string]interface{}{ + "awsEndpoint": "http://localstack:4566", + "cloudwatchEndpoint": "http://localstack:4566", + "dynamoEndpoint": "http://localstack:4566", + "awsRegion": "us-east-1", + "awsKinesisStreamName": "demo-stream", + "awsCredentialPluginParam": `{"accessKey":"test","secretKey":"test"}`, + "useEnhancedFanOut": false, + "applicationName": "pulsar-kinesis-demo", + "initialPositionInStream": "TRIM_HORIZON", + }) + function.Spec.SourceConfig = &v1alpha1.SourceConnectorSpec{ + Archive: "builtin://kinesis", + Configs: &sourceConfigs, + TypeClassName: "java.lang.Object", + } + + sinkConfigs := v1alpha1.NewConfig(map[string]interface{}{ + "bootstrapServers": "kafka:9092", + "acks": "all", + "topic": "kinesis-demo", + "producerConfigProperties": map[string]interface{}{ + "enable.idempotence": true, + }, + }) + function.Spec.SinkConfig = &v1alpha1.SinkConnectorSpec{ + SinkType: "kafka", + Configs: &sinkConfigs, + TypeClassName: "java.lang.Object", + } + + commands := makeFunctionCommand(function) + assert.Assert(t, len(commands) == 3, "commands should be 3 but got %d", len(commands)) + + startCommand := commands[2] + assert.Assert(t, strings.Contains(startCommand, "--connectors_directory "+DefaultConnectorsDirectory), + "start command should include connectors directory but got %s", startCommand) + re := regexp.MustCompile(`--function_details '([^']+)'`) + matches := re.FindStringSubmatch(startCommand) + assert.Assert(t, len(matches) == 2, "unable to locate function details in command: %s", startCommand) + + functionDetailsJSON := matches[1] + details := &proto.FunctionDetails{} + err := protojson.Unmarshal([]byte(functionDetailsJSON), details) + assert.NilError(t, err) + + assert.Equal(t, details.GetTenant(), "public") + assert.Equal(t, details.GetNamespace(), "default") + assert.Equal(t, details.GetName(), "connector-test") + + sourceSpec := details.GetSource() + assert.Equal(t, sourceSpec.GetBuiltin(), "kinesis") + assert.Equal(t, sourceSpec.GetTypeClassName(), "java.lang.Object") + sourceConfigsMap := map[string]interface{}{} + err = json.Unmarshal([]byte(sourceSpec.GetConfigs()), &sourceConfigsMap) + assert.NilError(t, err) + assert.Equal(t, sourceConfigsMap["awsEndpoint"], "http://localstack:4566") + assert.Equal(t, sourceConfigsMap["awsKinesisStreamName"], "demo-stream") + assert.Equal(t, sourceConfigsMap["initialPositionInStream"], "TRIM_HORIZON") + + sinkSpec := details.GetSink() + sinkConfigsMap := map[string]interface{}{} + err = json.Unmarshal([]byte(sinkSpec.GetConfigs()), &sinkConfigsMap) + assert.NilError(t, err) + assert.Equal(t, sinkConfigsMap["sinkType"], "kafka") + assert.Equal(t, sinkConfigsMap["bootstrapServers"], "kafka:9092") + assert.Equal(t, sinkConfigsMap["acks"], "all") + producerConfig := sinkConfigsMap["producerConfigProperties"].(map[string]interface{}) + assert.Equal(t, producerConfig["enable.idempotence"], true) +} diff --git a/controllers/spec/sink.go b/controllers/spec/sink.go index d4e0ff089..12200fc5a 100644 --- a/controllers/spec/sink.go +++ b/controllers/spec/sink.go @@ -232,7 +232,9 @@ func MakeSinkCommand(sink *v1alpha1.Sink) []string { GenerateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), parseJavaLogLevel(spec.Java), generateSinkDetailsInJSON(sink), - spec.Java.ExtraDependenciesDir, string(sink.UID), + spec.Java.ExtraDependenciesDir, + "", + string(sink.UID), spec.Resources.Limits.Memory(), spec.Java.JavaOpts, hasPulsarctl, hasWget, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, nil, diff --git a/controllers/spec/source.go b/controllers/spec/source.go index c8c942c4d..24d4cb33c 100644 --- a/controllers/spec/source.go +++ b/controllers/spec/source.go @@ -179,7 +179,9 @@ func makeSourceCommand(source *v1alpha1.Source) []string { GenerateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), parseJavaLogLevel(spec.Java), generateSourceDetailsInJSON(source), - spec.Java.ExtraDependenciesDir, string(source.UID), + spec.Java.ExtraDependenciesDir, + "", + string(source.UID), spec.Resources.Limits.Memory(), spec.Java.JavaOpts, hasPulsarctl, hasWget, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, nil, diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index e62171b22..7ada1e0ca 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -98,6 +98,140 @@ func fetchClassName(function *v1alpha1.Function) string { return function.Spec.ClassName } +const builtinURLPrefix = "builtin://" + +type connectorConfigDetails struct { + builtin string + className string + typeClassName string + configs string +} + +func encodeConnectorConfigs(config *v1alpha1.Config) string { + if config == nil { + return "" + } + payload := getUserConfig(config) + if payload == "" || payload == "null" { + return "" + } + return payload +} + +func buildSourceConnectorDetails(cfg *v1alpha1.SourceConnectorSpec) *connectorConfigDetails { + if cfg == nil { + return nil + } + + details := &connectorConfigDetails{ + className: cfg.ClassName, + typeClassName: cfg.TypeClassName, + } + + builtin := cfg.Builtin + if builtin == "" { + if archive := cfg.Archive; strings.HasPrefix(archive, builtinURLPrefix) { + builtin = strings.TrimPrefix(archive, builtinURLPrefix) + } + } + details.builtin = builtin + + if configsJSON := encodeConnectorConfigs(cfg.Configs); configsJSON != "" { + details.configs = configsJSON + } + + if details.builtin == "" && details.className == "" && details.typeClassName == "" && details.configs == "" { + return nil + } + return details +} + +func buildSinkConnectorDetails(cfg *v1alpha1.SinkConnectorSpec) *connectorConfigDetails { + if cfg == nil { + return nil + } + + details := &connectorConfigDetails{ + className: cfg.ClassName, + typeClassName: cfg.TypeClassName, + } + + builtin := cfg.Builtin + if builtin == "" { + if archive := cfg.Archive; strings.HasPrefix(archive, builtinURLPrefix) { + builtin = strings.TrimPrefix(archive, builtinURLPrefix) + } else if sinkType := cfg.SinkType; sinkType != "" { + builtin = sinkType + } + } + details.builtin = builtin + + if cfg.Configs != nil || cfg.SinkType != "" { + merged := map[string]interface{}{} + if cfg.Configs != nil && cfg.Configs.Data != nil { + for k, v := range cfg.Configs.Data { + merged[k] = v + } + } + if cfg.SinkType != "" { + if _, ok := merged["sinkType"]; !ok { + merged["sinkType"] = cfg.SinkType + } + } + if len(merged) > 0 { + tmp := v1alpha1.NewConfig(merged) + if configsJSON := encodeConnectorConfigs(&tmp); configsJSON != "" { + details.configs = configsJSON + } + } + } + + if details.builtin == "" && details.className == "" && details.typeClassName == "" && details.configs == "" { + return nil + } + return details +} + +func applyFunctionConnectorSourceSpec(function *v1alpha1.Function, spec *proto.SourceSpec) { + details := buildSourceConnectorDetails(function.Spec.SourceConfig) + if details == nil { + return + } + + if details.builtin != "" { + spec.Builtin = details.builtin + } + if details.className != "" { + spec.ClassName = details.className + } + if spec.TypeClassName == "" && details.typeClassName != "" { + spec.TypeClassName = details.typeClassName + } + if details.configs != "" { + spec.Configs = details.configs + } +} + +func applyFunctionConnectorSinkSpec(function *v1alpha1.Function, spec *proto.SinkSpec) { + details := buildSinkConnectorDetails(function.Spec.SinkConfig) + if details == nil { + return + } + + if details.builtin != "" { + spec.Builtin = details.builtin + } + if details.className != "" { + spec.ClassName = details.className + } + if spec.TypeClassName == "" && details.typeClassName != "" { + spec.TypeClassName = details.typeClassName + } + if details.configs != "" { + spec.Configs = details.configs + } +} + func convertGoFunctionConfs(function *v1alpha1.Function) *GoFunctionConf { deadLetterTopic := getDeadLetterTopicOrDefault(function.Spec.DeadLetterTopic, function.Spec.SubscriptionName, function.Spec.Tenant, function.Spec.Namespace, function.Spec.Name, function.Spec.MaxMessageRetry) @@ -195,7 +329,7 @@ func generateInputSpec(sourceConf v1alpha1.InputConf) map[string]*proto.Consumer func generateFunctionInputSpec(function *v1alpha1.Function) *proto.SourceSpec { inputSpecs := generateInputSpec(function.Spec.Input) - return &proto.SourceSpec{ + sourceSpec := &proto.SourceSpec{ ClassName: "", Configs: "", TypeClassName: function.Spec.Input.TypeClassName, @@ -210,6 +344,9 @@ func generateFunctionInputSpec(function *v1alpha1.Function) *proto.SourceSpec { NegativeAckRedeliveryDelayMs: uint64(function.Spec.Timeout), SkipToLatest: function.Spec.SkipToLatest, } + + applyFunctionConnectorSourceSpec(function, sourceSpec) + return sourceSpec } func generateFunctionOutputSpec(function *v1alpha1.Function) *proto.SinkSpec { @@ -247,6 +384,7 @@ func generateFunctionOutputSpec(function *v1alpha1.Function) *proto.SinkSpec { sinkSpec.ProducerSpec = producerConfig } + applyFunctionConnectorSinkSpec(function, sinkSpec) return sinkSpec } diff --git a/controllers/spec/utils_test.go b/controllers/spec/utils_test.go index 0665467db..04c6e7794 100644 --- a/controllers/spec/utils_test.go +++ b/controllers/spec/utils_test.go @@ -104,3 +104,48 @@ func TestBatchSource(t *testing.T) { assert.Equal(t, v1alpha1.BatchSourceClass, sourceSpec.ClassName) assert.Equal(t, `{"__BATCHSOURCECLASSNAME__":"org.apache.pulsar.ecosystem.io.bigquery.BigQuerySource","__BATCHSOURCECONFIGS__":"{\"discoveryTriggererClassName\":\"test-trigger-class\",\"discoveryTriggererConfig\":{\"test-key\":\"test-value\"}}","tableName":"test-table"}`, sourceSpec.Configs) } + +func TestGenerateFunctionInputSpecWithConnector(t *testing.T) { + function := makeFunctionSample("connector-source") + function.Spec.Input.Topics = nil + function.Spec.Input.CustomSerdeSources = nil + function.Spec.Input.CustomSchemaSources = nil + function.Spec.Input.SourceSpecs = nil + configs := &v1alpha1.Config{ + Data: map[string]interface{}{ + "bootstrapServers": "kafka:9092", + "groupId": "test-consumer", + }, + } + function.Spec.SourceConfig = &v1alpha1.SourceConnectorSpec{ + Archive: "builtin://kafka", + ClassName: "org.apache.pulsar.io.kafka.KafkaSource", + Configs: configs, + } + + sourceSpec := generateFunctionInputSpec(function) + assert.Equal(t, "kafka", sourceSpec.Builtin) + assert.Equal(t, "org.apache.pulsar.io.kafka.KafkaSource", sourceSpec.ClassName) + assert.Equal(t, `{"bootstrapServers":"kafka:9092","groupId":"test-consumer"}`, sourceSpec.Configs) +} + +func TestGenerateFunctionOutputSpecWithConnector(t *testing.T) { + function := makeFunctionSample("connector-sink") + function.Spec.Output.Topic = "" + configs := &v1alpha1.Config{ + Data: map[string]interface{}{ + "bootstrapServers": "kafka:9092", + "topic": "kafka-output", + }, + } + function.Spec.SinkConfig = &v1alpha1.SinkConnectorSpec{ + SinkType: "kafka", + ClassName: "org.apache.pulsar.io.kafka.KafkaSink", + Configs: configs, + } + + sinkSpec := generateFunctionOutputSpec(function) + assert.Equal(t, "kafka", sinkSpec.Builtin) + assert.Equal(t, "org.apache.pulsar.io.kafka.KafkaSink", sinkSpec.ClassName) + assert.Equal(t, `{"bootstrapServers":"kafka:9092","sinkType":"kafka","topic":"kafka-output"}`, sinkSpec.Configs) +} diff --git a/manifests/crd.yaml b/manifests/crd.yaml index 7112788d8..b71c7c542 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -10816,8 +10816,21 @@ spec: showPreciseParallelism: type: boolean sinkConfig: + properties: + archive: + type: string + builtin: + type: string + className: + type: string + configs: + additionalProperties: {} + type: object + sinkType: + type: string + typeClassName: + type: string type: object - x-kubernetes-preserve-unknown-fields: true sinkType: type: string statefulConfig: @@ -14470,8 +14483,19 @@ spec: showPreciseParallelism: type: boolean sourceConfig: + properties: + archive: + type: string + builtin: + type: string + className: + type: string + configs: + additionalProperties: {} + type: object + typeClassName: + type: string type: object - x-kubernetes-preserve-unknown-fields: true sourceType: type: string statefulConfig: @@ -22250,8 +22274,21 @@ spec: showPreciseParallelism: type: boolean sinkConfig: + properties: + archive: + type: string + builtin: + type: string + className: + type: string + configs: + additionalProperties: {} + type: object + sinkType: + type: string + typeClassName: + type: string type: object - x-kubernetes-preserve-unknown-fields: true sinkType: type: string statefulConfig: @@ -25983,8 +26020,19 @@ spec: showPreciseParallelism: type: boolean sourceConfig: + properties: + archive: + type: string + builtin: + type: string + className: + type: string + configs: + additionalProperties: {} + type: object + typeClassName: + type: string type: object - x-kubernetes-preserve-unknown-fields: true sourceType: type: string statefulConfig: diff --git a/pkg/webhook/function_webhook.go b/pkg/webhook/function_webhook.go index 3c4ded9d8..2332251e5 100644 --- a/pkg/webhook/function_webhook.go +++ b/pkg/webhook/function_webhook.go @@ -280,7 +280,10 @@ func (webhook *FunctionWebhook) ValidateCreate(ctx context.Context, obj runtime. allErrs = append(allErrs, fieldErr) } - fieldErrs = validateInputOutput(&r.Spec.Input, &r.Spec.Output) + skipInputValidation := r.Spec.SourceConfig != nil + skipOutputValidation := r.Spec.SinkConfig != nil + + fieldErrs = validateInputOutput(&r.Spec.Input, &r.Spec.Output, skipInputValidation, skipOutputValidation) if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } diff --git a/pkg/webhook/sink_webhook.go b/pkg/webhook/sink_webhook.go index 0500e87c1..1ba57f2c1 100644 --- a/pkg/webhook/sink_webhook.go +++ b/pkg/webhook/sink_webhook.go @@ -219,7 +219,7 @@ func (webhook *SinkWebhook) ValidateCreate(ctx context.Context, obj runtime.Obje allErrs = append(allErrs, fieldErr) } - fieldErrs = validateInputOutput(&r.Spec.Input, nil) + fieldErrs = validateInputOutput(&r.Spec.Input, nil, false, true) if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } diff --git a/pkg/webhook/source_webhook.go b/pkg/webhook/source_webhook.go index f9084dda8..bb8a44234 100644 --- a/pkg/webhook/source_webhook.go +++ b/pkg/webhook/source_webhook.go @@ -214,7 +214,7 @@ func (webhook *SourceWebhook) ValidateCreate(ctx context.Context, obj runtime.Ob allErrs = append(allErrs, fieldErr) } - fieldErrs = validateInputOutput(nil, &r.Spec.Output) + fieldErrs = validateInputOutput(nil, &r.Spec.Output, true, false) if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } diff --git a/pkg/webhook/validate.go b/pkg/webhook/validate.go index 73cb82ba4..f5be491d2 100644 --- a/pkg/webhook/validate.go +++ b/pkg/webhook/validate.go @@ -230,42 +230,45 @@ func validateSecretsMap(secrets map[string]v1alpha1.SecretRef) *field.Error { return nil } -func validateInputOutput(input *v1alpha1.InputConf, output *v1alpha1.OutputConf) []*field.Error { +func validateInputOutput(input *v1alpha1.InputConf, output *v1alpha1.OutputConf, + skipInputValidation bool, skipOutputValidation bool) []*field.Error { var allErrs field.ErrorList allInputTopics := []string{} if input != nil { allInputTopics = collectAllInputTopics(*input) - if len(allInputTopics) == 0 { - e := field.Invalid(field.NewPath("spec").Child("input"), *input, - "No input topic(s) specified for the function") - allErrs = append(allErrs, e) - } - - for _, topic := range allInputTopics { - err := isValidTopicName(topic) - if err != nil { + if !skipInputValidation { + if len(allInputTopics) == 0 { e := field.Invalid(field.NewPath("spec").Child("input"), *input, - fmt.Sprintf("Input topic %s is invalid", topic)) + "No input topic(s) specified for the function") allErrs = append(allErrs, e) } - } - for topicName, conf := range input.SourceSpecs { - if conf.ReceiverQueueSize != nil && *conf.ReceiverQueueSize < 0 { - e := field.Invalid(field.NewPath("spec").Child("input", "sourceSpecs"), - input.SourceSpecs, fmt.Sprintf("%s receiver queue size should be >= zero", topicName)) - allErrs = append(allErrs, e) + for _, topic := range allInputTopics { + err := isValidTopicName(topic) + if err != nil { + e := field.Invalid(field.NewPath("spec").Child("input"), *input, + fmt.Sprintf("Input topic %s is invalid", topic)) + allErrs = append(allErrs, e) + } } - if conf.CryptoConfig != nil && conf.CryptoConfig.CryptoKeyReaderClassName == "" { - e := field.Invalid(field.NewPath("spec").Child("input", "sourceSpecs"), - input.SourceSpecs, fmt.Sprintf("%s cryptoKeyReader class name required", topicName)) - allErrs = append(allErrs, e) + for topicName, conf := range input.SourceSpecs { + if conf.ReceiverQueueSize != nil && *conf.ReceiverQueueSize < 0 { + e := field.Invalid(field.NewPath("spec").Child("input", "sourceSpecs"), + input.SourceSpecs, fmt.Sprintf("%s receiver queue size should be >= zero", topicName)) + allErrs = append(allErrs, e) + } + + if conf.CryptoConfig != nil && conf.CryptoConfig.CryptoKeyReaderClassName == "" { + e := field.Invalid(field.NewPath("spec").Child("input", "sourceSpecs"), + input.SourceSpecs, fmt.Sprintf("%s cryptoKeyReader class name required", topicName)) + allErrs = append(allErrs, e) + } } } } - if output != nil { + if output != nil && !skipOutputValidation { if output.Topic != "" { err := isValidTopicName(output.Topic) if err != nil { From f94e94f4abdbf68391be3bf6cf47271666423abf Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 4 Nov 2025 22:07:44 +0800 Subject: [PATCH 2/5] fix spec --- api/compute/v1alpha1/function_types.go | 11 ++-------- api/compute/v1alpha1/zz_generated.deepcopy.go | 5 +++++ ...ompute.functionmesh.io_functionmeshes.yaml | 20 +++++++++++-------- .../compute.functionmesh.io_functions.yaml | 20 +++++++++++-------- 4 files changed, 31 insertions(+), 25 deletions(-) diff --git a/api/compute/v1alpha1/function_types.go b/api/compute/v1alpha1/function_types.go index 192869344..2ac613977 100644 --- a/api/compute/v1alpha1/function_types.go +++ b/api/compute/v1alpha1/function_types.go @@ -161,10 +161,10 @@ type SourceConnectorSpec struct { // Archive points to a nar archive containing the connector. It can reference built-in connectors using the // builtin:// scheme. // +kubebuilder:validation:Optional - Archive string `json:"archive,omitempty"` + SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector // Builtin holds the resolved name of a built-in source connector. // +kubebuilder:validation:Optional - Builtin string `json:"builtin,omitempty"` + BatchSourceConfig *BatchSourceConfig `json:"batchSourceConfig,omitempty"` // ClassName is the fully qualified source implementation class. // +kubebuilder:validation:Optional ClassName string `json:"className,omitempty"` @@ -179,13 +179,6 @@ type SourceConnectorSpec struct { // SinkConnectorSpec describes configurable fields when a function overrides its sink implementation. type SinkConnectorSpec struct { - // Archive points to a nar archive containing the connector. It can reference built-in connectors using the - // builtin:// scheme. - // +kubebuilder:validation:Optional - Archive string `json:"archive,omitempty"` - // Builtin holds the resolved name of a built-in sink connector. - // +kubebuilder:validation:Optional - Builtin string `json:"builtin,omitempty"` // SinkType refers to the built-in sink identifier when using connectors packaged with Pulsar. // +kubebuilder:validation:Optional SinkType string `json:"sinkType,omitempty"` diff --git a/api/compute/v1alpha1/zz_generated.deepcopy.go b/api/compute/v1alpha1/zz_generated.deepcopy.go index 0d678581e..6263e573b 100644 --- a/api/compute/v1alpha1/zz_generated.deepcopy.go +++ b/api/compute/v1alpha1/zz_generated.deepcopy.go @@ -1331,6 +1331,11 @@ func (in *Source) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SourceConnectorSpec) DeepCopyInto(out *SourceConnectorSpec) { *out = *in + if in.BatchSourceConfig != nil { + in, out := &in.BatchSourceConfig, &out.BatchSourceConfig + *out = new(BatchSourceConfig) + (*in).DeepCopyInto(*out) + } if in.Configs != nil { in, out := &in.Configs, &out.Configs *out = (*in).DeepCopy() diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index c8770700f..c715fbd05 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -3712,10 +3712,6 @@ spec: type: boolean sinkConfig: properties: - archive: - type: string - builtin: - type: string className: type: string configs: @@ -3730,15 +3726,23 @@ spec: type: boolean sourceConfig: properties: - archive: - type: string - builtin: - type: string + batchSourceConfig: + properties: + discoveryTriggererClassName: + type: string + discoveryTriggererConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + required: + - discoveryTriggererClassName + type: object className: type: string configs: type: object x-kubernetes-preserve-unknown-fields: true + sourceType: + type: string typeClassName: type: string type: object diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index c971eb8bd..8e9248eca 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -3709,10 +3709,6 @@ spec: type: boolean sinkConfig: properties: - archive: - type: string - builtin: - type: string className: type: string configs: @@ -3727,15 +3723,23 @@ spec: type: boolean sourceConfig: properties: - archive: - type: string - builtin: - type: string + batchSourceConfig: + properties: + discoveryTriggererClassName: + type: string + discoveryTriggererConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + required: + - discoveryTriggererClassName + type: object className: type: string configs: type: object x-kubernetes-preserve-unknown-fields: true + sourceType: + type: string typeClassName: type: string type: object From 66fae0b8fc2647e01550a7d2114f6899936d234c Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 5 Nov 2025 00:34:40 +0800 Subject: [PATCH 3/5] address type changes --- controllers/spec/function_test.go | 2 +- controllers/spec/utils.go | 183 ++++++++++++++++++++++++------ controllers/spec/utils_test.go | 47 +++++++- 3 files changed, 193 insertions(+), 39 deletions(-) diff --git a/controllers/spec/function_test.go b/controllers/spec/function_test.go index 30229442f..0c14af74c 100644 --- a/controllers/spec/function_test.go +++ b/controllers/spec/function_test.go @@ -173,7 +173,7 @@ func TestJavaFunctionCommandWithConnectorOverrides(t *testing.T) { "initialPositionInStream": "TRIM_HORIZON", }) function.Spec.SourceConfig = &v1alpha1.SourceConnectorSpec{ - Archive: "builtin://kinesis", + SourceType: "kinesis", Configs: &sourceConfigs, TypeClassName: "java.lang.Object", } diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index 7ada1e0ca..56558cd08 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -118,25 +118,123 @@ func encodeConnectorConfigs(config *v1alpha1.Config) string { return payload } +func stringFromConfig(config *v1alpha1.Config, key string) string { + if config == nil || config.Data == nil { + return "" + } + if val, ok := config.Data[key]; ok && val != nil { + switch typed := val.(type) { + case string: + return typed + default: + return fmt.Sprint(typed) + } + } + return "" +} + +func addConfigEntries(dst map[string]interface{}, value interface{}) { + switch typed := value.(type) { + case map[string]interface{}: + for k, v := range typed { + if k == "" || v == nil { + continue + } + dst[k] = v + } + case map[interface{}]interface{}: + for k, v := range typed { + if k == nil || v == nil { + continue + } + key := fmt.Sprint(k) + if key == "" { + continue + } + dst[key] = v + } + } +} + +func extractConnectorConfigs(config *v1alpha1.Config, reservedKeys map[string]struct{}) map[string]interface{} { + if config == nil || config.Data == nil { + return nil + } + + result := make(map[string]interface{}) + if nested, ok := config.Data["configs"]; ok { + addConfigEntries(result, nested) + } + for key, value := range config.Data { + if value == nil || key == "configs" { + continue + } + if reservedKeys != nil { + if _, skip := reservedKeys[key]; skip { + continue + } + } + result[key] = value + } + if len(result) == 0 { + return nil + } + return result +} + +func resolveBuiltinFromConfig(config *v1alpha1.Config) string { + if builtin := stringFromConfig(config, "builtin"); builtin != "" { + return builtin + } + if archive := stringFromConfig(config, "archive"); strings.HasPrefix(archive, builtinURLPrefix) { + return strings.TrimPrefix(archive, builtinURLPrefix) + } + return "" +} + func buildSourceConnectorDetails(cfg *v1alpha1.SourceConnectorSpec) *connectorConfigDetails { if cfg == nil { return nil } - details := &connectorConfigDetails{ - className: cfg.ClassName, - typeClassName: cfg.TypeClassName, + details := &connectorConfigDetails{} + + className := cfg.ClassName + if className == "" { + className = stringFromConfig(cfg.Configs, "className") + } + if className != "" { + details.className = className } - builtin := cfg.Builtin - if builtin == "" { - if archive := cfg.Archive; strings.HasPrefix(archive, builtinURLPrefix) { - builtin = strings.TrimPrefix(archive, builtinURLPrefix) - } + typeClassName := cfg.TypeClassName + if typeClassName == "" { + typeClassName = stringFromConfig(cfg.Configs, "typeClassName") + } + if typeClassName != "" { + details.typeClassName = typeClassName } - details.builtin = builtin - if configsJSON := encodeConnectorConfigs(cfg.Configs); configsJSON != "" { + builtin := cfg.SourceType + if builtin == "" { + builtin = resolveBuiltinFromConfig(cfg.Configs) + } + if builtin != "" { + details.builtin = builtin + } + + configMap := extractConnectorConfigs(cfg.Configs, map[string]struct{}{ + "archive": {}, + "builtin": {}, + "className": {}, + "typeClassName": {}, + }) + if len(configMap) > 0 { + tmp := v1alpha1.NewConfig(configMap) + if configsJSON := encodeConnectorConfigs(&tmp); configsJSON != "" { + details.configs = configsJSON + } + } else if configsJSON := encodeConnectorConfigs(cfg.Configs); configsJSON != "" { details.configs = configsJSON } @@ -151,39 +249,54 @@ func buildSinkConnectorDetails(cfg *v1alpha1.SinkConnectorSpec) *connectorConfig return nil } - details := &connectorConfigDetails{ - className: cfg.ClassName, - typeClassName: cfg.TypeClassName, + details := &connectorConfigDetails{} + + className := cfg.ClassName + if className == "" { + className = stringFromConfig(cfg.Configs, "className") + } + if className != "" { + details.className = className + } + + typeClassName := cfg.TypeClassName + if typeClassName == "" { + typeClassName = stringFromConfig(cfg.Configs, "typeClassName") + } + if typeClassName != "" { + details.typeClassName = typeClassName } - builtin := cfg.Builtin + builtin := cfg.SinkType if builtin == "" { - if archive := cfg.Archive; strings.HasPrefix(archive, builtinURLPrefix) { - builtin = strings.TrimPrefix(archive, builtinURLPrefix) - } else if sinkType := cfg.SinkType; sinkType != "" { - builtin = sinkType - } + builtin = resolveBuiltinFromConfig(cfg.Configs) + } + if builtin != "" { + details.builtin = builtin } - details.builtin = builtin - if cfg.Configs != nil || cfg.SinkType != "" { - merged := map[string]interface{}{} - if cfg.Configs != nil && cfg.Configs.Data != nil { - for k, v := range cfg.Configs.Data { - merged[k] = v - } + configMap := extractConnectorConfigs(cfg.Configs, map[string]struct{}{ + "archive": {}, + "builtin": {}, + "className": {}, + "typeClassName": {}, + }) + if builtin != "" { + if configMap == nil { + configMap = map[string]interface{}{} } - if cfg.SinkType != "" { - if _, ok := merged["sinkType"]; !ok { - merged["sinkType"] = cfg.SinkType - } + if _, ok := configMap["sinkType"]; !ok { + configMap["sinkType"] = builtin } - if len(merged) > 0 { - tmp := v1alpha1.NewConfig(merged) - if configsJSON := encodeConnectorConfigs(&tmp); configsJSON != "" { - details.configs = configsJSON - } + } + + if len(configMap) > 0 { + tmp := v1alpha1.NewConfig(configMap) + if configsJSON := encodeConnectorConfigs(&tmp); configsJSON != "" { + details.configs = configsJSON } + } else if configsJSON := encodeConnectorConfigs(cfg.Configs); configsJSON != "" { + details.configs = configsJSON } if details.builtin == "" && details.className == "" && details.typeClassName == "" && details.configs == "" { diff --git a/controllers/spec/utils_test.go b/controllers/spec/utils_test.go index 04c6e7794..087c9a281 100644 --- a/controllers/spec/utils_test.go +++ b/controllers/spec/utils_test.go @@ -118,9 +118,9 @@ func TestGenerateFunctionInputSpecWithConnector(t *testing.T) { }, } function.Spec.SourceConfig = &v1alpha1.SourceConnectorSpec{ - Archive: "builtin://kafka", - ClassName: "org.apache.pulsar.io.kafka.KafkaSource", - Configs: configs, + SourceType: "kafka", + ClassName: "org.apache.pulsar.io.kafka.KafkaSource", + Configs: configs, } sourceSpec := generateFunctionInputSpec(function) @@ -149,3 +149,44 @@ func TestGenerateFunctionOutputSpecWithConnector(t *testing.T) { assert.Equal(t, "org.apache.pulsar.io.kafka.KafkaSink", sinkSpec.ClassName) assert.Equal(t, `{"bootstrapServers":"kafka:9092","sinkType":"kafka","topic":"kafka-output"}`, sinkSpec.Configs) } + +func TestBuildSourceConnectorDetailsFromConfig(t *testing.T) { + connectorConfig := v1alpha1.NewConfig(map[string]interface{}{ + "archive": "builtin://filesystem", + "className": "org.apache.pulsar.io.fs.FileSource", + "typeClassName": "java.lang.String", + "configs": map[string]interface{}{ + "inputDirectory": "/var/data", + "recurse": true, + }, + }) + + details := buildSourceConnectorDetails(&v1alpha1.SourceConnectorSpec{ + Configs: &connectorConfig, + }) + + assert.NotNil(t, details) + assert.Equal(t, "filesystem", details.builtin) + assert.Equal(t, "org.apache.pulsar.io.fs.FileSource", details.className) + assert.Equal(t, "java.lang.String", details.typeClassName) + assert.Equal(t, `{"inputDirectory":"/var/data","recurse":true}`, details.configs) +} + +func TestBuildSinkConnectorDetailsFromConfig(t *testing.T) { + connectorConfig := v1alpha1.NewConfig(map[string]interface{}{ + "archive": "builtin://jms", + "className": "org.apache.pulsar.io.jms.JMSSink", + "configs": map[string]interface{}{ + "queueName": "demo-queue", + }, + }) + + details := buildSinkConnectorDetails(&v1alpha1.SinkConnectorSpec{ + Configs: &connectorConfig, + }) + + assert.NotNil(t, details) + assert.Equal(t, "jms", details.builtin) + assert.Equal(t, "org.apache.pulsar.io.jms.JMSSink", details.className) + assert.Equal(t, `{"queueName":"demo-queue","sinkType":"jms"}`, details.configs) +} From d4d39add69a9a9d657fd96fad1db57bef24afba7 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 5 Nov 2025 01:25:25 +0800 Subject: [PATCH 4/5] fix test --- controllers/spec/common.go | 144 +++++++++++++++++++++---------------- 1 file changed, 82 insertions(+), 62 deletions(-) diff --git a/controllers/spec/common.go b/controllers/spec/common.go index eab364604..9fa2c01fa 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -170,6 +170,25 @@ type TLSConfig interface { GetMountPath() string } +type RollingCfg struct { + Enabled bool + Type string // "size" or "time" + File string + // size-based + MaxBytes int + Backups int + // time-based + When string // e.g. "D", "W0" + Interval int +} + +type LogCfg struct { + Level string // INFO/DEBUG/... + Format string // "json" or "text" + Handlers string // computed: "stream_handler[,rotating_file_handler|,timed_rotating_file_handler]" + Rolling RollingCfg +} + func IsManaged(object metav1.Object) bool { managed, exists := object.GetAnnotations()[AnnotationManaged] return !exists || managed != "false" @@ -1105,88 +1124,89 @@ func generatePythonLogConfigCommand(name string, runtime *v1alpha1.PythonRuntime func renderPythonInstanceLoggingINITemplate(name string, runtime *v1alpha1.PythonRuntime, agent v1alpha1.LogTopicAgent) (string, error) { tmpl := template.Must(template.New("spec").Parse(pythonLoggingINITemplate)) - var tpl bytes.Buffer - type logConfig struct { - RollingEnabled bool - Level string - Policy template.HTML - Handlers string - Format string + + lc := LogCfg{ + Level: "INFO", + Format: "text", + Rolling: RollingCfg{ + Enabled: false, + Backups: 5, + }, } - lc := &logConfig{} - lc.Level = "INFO" - lc.Format = "text" - lc.Handlers = "stream_handler" + + // level if runtime.Log != nil && runtime.Log.Level != "" { if level := parsePythonLogLevel(runtime); level != "" { lc.Level = level } } - if runtime.Log != nil && runtime.Log.Format != nil { - lc.Format = string(*runtime.Log.Format) + // format + if runtime.Log != nil && runtime.Log.Format != nil && strings.ToLower(string(*runtime.Log.Format)) == "json" { + lc.Format = "json" } + + // default handler + lc.Handlers = "stream_handler" + + // log file path + logFile := fmt.Sprintf("logs/functions/%s.log", name) + + // rolling policy if runtime.Log != nil && runtime.Log.RotatePolicy != nil { - lc.RollingEnabled = true - logFile := fmt.Sprintf("logs/functions/%s-${%s}.log", name, EnvShardID) + lc.Rolling.Enabled = true switch *runtime.Log.RotatePolicy { + case v1alpha1.SizedPolicyWith10MB: + lc.Rolling.Type = "size" + lc.Rolling.File = logFile + lc.Rolling.MaxBytes = 10 * 1024 * 1024 + lc.Handlers = "stream_handler,rotating_file_handler" + case v1alpha1.SizedPolicyWith50MB: + lc.Rolling.Type = "size" + lc.Rolling.File = logFile + lc.Rolling.MaxBytes = 50 * 1024 * 1024 + lc.Handlers = "stream_handler,rotating_file_handler" + case v1alpha1.SizedPolicyWith100MB: + lc.Rolling.Type = "size" + lc.Rolling.File = logFile + lc.Rolling.MaxBytes = 100 * 1024 * 1024 + lc.Handlers = "stream_handler,rotating_file_handler" case v1alpha1.TimedPolicyWithDaily: + lc.Rolling.Type = "time" + lc.Rolling.File = logFile + lc.Rolling.When = "D" + lc.Rolling.Interval = 1 lc.Handlers = "stream_handler,timed_rotating_file_handler" - lc.Policy = template.HTML(fmt.Sprintf(`[handler_timed_rotating_file_handler] -args=(\"%s\", 'D', 1, 5,) -class=handlers.TimedRotatingFileHandler -level=%s -formatter=formatter`, logFile, lc.Level)) case v1alpha1.TimedPolicyWithWeekly: + lc.Rolling.Type = "time" + lc.Rolling.File = logFile + lc.Rolling.When = "W0" // every monday + lc.Rolling.Interval = 1 lc.Handlers = "stream_handler,timed_rotating_file_handler" - lc.Policy = template.HTML(fmt.Sprintf(`[handler_timed_rotating_file_handler] -args=(\"%s\", 'W0', 1, 5,) -class=handlers.TimedRotatingFileHandler -level=%s -formatter=formatter`, logFile, lc.Level)) case v1alpha1.TimedPolicyWithMonthly: + lc.Rolling.Type = "time" + lc.Rolling.File = logFile + lc.Rolling.When = "D" // day + lc.Rolling.Interval = 30 lc.Handlers = "stream_handler,timed_rotating_file_handler" - lc.Policy = template.HTML(fmt.Sprintf(`[handler_timed_rotating_file_handler] -args=(\"%s\", 'D', 30, 5,) -class=handlers.TimedRotatingFileHandler -level=%s -formatter=formatter`, logFile, lc.Level)) - case v1alpha1.SizedPolicyWith10MB: - lc.Handlers = "stream_handler,rotating_file_handler" - lc.Policy = template.HTML(fmt.Sprintf(`[handler_rotating_file_handler] -args=(\"%s\", 'a', 10485760, 5,) -class=handlers.RotatingFileHandler -level=%s -formatter=formatter`, logFile, lc.Level)) - case v1alpha1.SizedPolicyWith50MB: - lc.Handlers = "handler_stream_handler,rotating_file_handler" - lc.Policy = template.HTML(fmt.Sprintf(`[handler_rotating_file_handler] -args=(%s, 'a', 52428800, 5,) -class=handlers.RotatingFileHandler -level=%s -formatter=formatter`, logFile, lc.Level)) - case v1alpha1.SizedPolicyWith100MB: - lc.Handlers = "handler_stream_handler,rotating_file_handler" - lc.Policy = template.HTML(fmt.Sprintf(`[handler_rotating_file_handler] -args=(%s, 'a', 104857600, 5,) -class=handlers.RotatingFileHandler -level=%s -formatter=formatter`, logFile, lc.Level)) - } - } else if agent == v1alpha1.SIDECAR { // sidecar mode needs the rotated log file - lc.RollingEnabled = true - logFile := fmt.Sprintf("logs/functions/%s-${%s}.log", name, EnvShardID) + } + } else if agent == v1alpha1.SIDECAR { + // sidecar mode enables rolling by default, using size policy with 10MB + lc.Rolling = RollingCfg{ + Enabled: true, + Type: "size", + File: logFile, + MaxBytes: 10 * 1024 * 1024, + Backups: 5, + } lc.Handlers = "stream_handler,rotating_file_handler" - lc.Policy = template.HTML(fmt.Sprintf(`[handler_rotating_file_handler] -args=(\"%s\", 'a', 10485760, 5,) -class=handlers.RotatingFileHandler -level=%s -formatter=formatter`, logFile, lc.Level)) } - if err := tmpl.Execute(&tpl, lc); err != nil { + + var buf bytes.Buffer + if err := tmpl.Execute(&buf, &lc); err != nil { log.Error(err, "failed to render python instance logging template") return "", err } - return tpl.String(), nil + return buf.String(), nil } func parseJavaLogLevel(runtime *v1alpha1.JavaRuntime) string { From 00209f2baee4654ed5593a60421756e3479849cd Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sat, 8 Nov 2025 00:23:44 +0800 Subject: [PATCH 5/5] add entry class and instance path --- api/compute/v1alpha1/common.go | 2 + api/compute/v1alpha1/zz_generated.deepcopy.go | 10 ++ ...ompute.functionmesh.io_functionmeshes.yaml | 12 ++ .../compute.functionmesh.io_functions.yaml | 4 + .../bases/compute.functionmesh.io_sinks.yaml | 4 + .../compute.functionmesh.io_sources.yaml | 4 + controllers/spec/common.go | 13 +- controllers/spec/function.go | 10 +- controllers/spec/sink.go | 10 +- controllers/spec/source.go | 10 +- manifests/crd.yaml | 138 +++++++++++------- manifests/rbac.yaml | 1 - 12 files changed, 157 insertions(+), 61 deletions(-) diff --git a/api/compute/v1alpha1/common.go b/api/compute/v1alpha1/common.go index 53d205eef..5d022c59b 100644 --- a/api/compute/v1alpha1/common.go +++ b/api/compute/v1alpha1/common.go @@ -247,6 +247,8 @@ type JavaRuntime struct { ExtraDependenciesDir string `json:"extraDependenciesDir,omitempty"` Log *RuntimeLogConfig `json:"log,omitempty"` JavaOpts []string `json:"javaOpts,omitempty"` + InstancePath *string `json:"instancePath,omitempty"` + EntryClass *string `json:"entryClass,omitempty"` } // PythonRuntime contains the python runtime configs diff --git a/api/compute/v1alpha1/zz_generated.deepcopy.go b/api/compute/v1alpha1/zz_generated.deepcopy.go index 6263e573b..31bd91e0e 100644 --- a/api/compute/v1alpha1/zz_generated.deepcopy.go +++ b/api/compute/v1alpha1/zz_generated.deepcopy.go @@ -692,6 +692,16 @@ func (in *JavaRuntime) DeepCopyInto(out *JavaRuntime) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.InstancePath != nil { + in, out := &in.InstancePath, &out.InstancePath + *out = new(string) + **out = **in + } + if in.EntryClass != nil { + in, out := &in.EntryClass, &out.EntryClass + *out = new(string) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JavaRuntime. diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index c715fbd05..8c137356a 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -198,8 +198,12 @@ spec: type: object java: properties: + entryClass: + type: string extraDependenciesDir: type: string + instancePath: + type: string jar: type: string jarLocation: @@ -4185,8 +4189,12 @@ spec: type: object java: properties: + entryClass: + type: string extraDependenciesDir: type: string + instancePath: + type: string jar: type: string jarLocation: @@ -7784,8 +7792,12 @@ spec: type: string java: properties: + entryClass: + type: string extraDependenciesDir: type: string + instancePath: + type: string jar: type: string jarLocation: diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index 8e9248eca..beb60ff6c 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -195,8 +195,12 @@ spec: type: object java: properties: + entryClass: + type: string extraDependenciesDir: type: string + instancePath: + type: string jar: type: string jarLocation: diff --git a/config/crd/bases/compute.functionmesh.io_sinks.yaml b/config/crd/bases/compute.functionmesh.io_sinks.yaml index 7e752064a..3de747270 100644 --- a/config/crd/bases/compute.functionmesh.io_sinks.yaml +++ b/config/crd/bases/compute.functionmesh.io_sinks.yaml @@ -190,8 +190,12 @@ spec: type: object java: properties: + entryClass: + type: string extraDependenciesDir: type: string + instancePath: + type: string jar: type: string jarLocation: diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index 04c1ad55e..210d5bca1 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -124,8 +124,12 @@ spec: type: string java: properties: + entryClass: + type: string extraDependenciesDir: type: string + instancePath: + type: string jar: type: string jarLocation: diff --git a/controllers/spec/common.go b/controllers/spec/common.go index 9fa2c01fa..1c093590b 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -134,6 +134,9 @@ const ( FunctionContainerName = "pulsar-function" SinkContainerName = "pulsar-sink" SourceContainerName = "pulsar-source" + + DefaultPulsarFunctionsJavaInstancePath = "/pulsar/instances/java-instance.jar" + DefaultPulsarFunctionsJavaInstanceEntryClass = "org.apache.pulsar.functions.instance.JavaInstanceMain" ) //go:embed template/java-runtime-log4j.xml.tmpl @@ -538,11 +541,11 @@ func MakeJavaFunctionCommand(downloadPath, packageFile, name, clusterName, gener memory *resource.Quantity, javaOpts []string, hasPulsarctl, hasWget, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig, - maxPendingAsyncRequests *int32, logConfigFileName string) []string { + maxPendingAsyncRequests *int32, logConfigFileName, instancePath, entryClass string) []string { processCommand := setShardIDEnvironmentVariableCommand() + " && " + generateLogConfigCommand + strings.Join(getProcessJavaRuntimeArgs(name, packageFile, clusterName, logLevel, details, extraDependenciesDir, connectorsDirectory, uid, memory, javaOpts, authProvided, tlsProvided, secretMaps, state, tlsConfig, - authConfig, maxPendingAsyncRequests, logConfigFileName), " ") + authConfig, maxPendingAsyncRequests, logConfigFileName, instancePath, entryClass), " ") if downloadPath != "" && !utils.EnableInitContainers { // prepend download command if the downPath is provided downloadCommand := strings.Join(GetDownloadCommand(downloadPath, packageFile, hasPulsarctl, hasWget, @@ -1287,8 +1290,8 @@ func getProcessJavaRuntimeArgs(name, packageName, clusterName, logLevel, details memory *resource.Quantity, javaOpts []string, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig, - maxPendingAsyncRequests *int32, logConfigFileName string) []string { - classPath := "/pulsar/instances/java-instance.jar:$(echo /pulsar/lib/com.fasterxml.jackson.dataformat-jackson-dataformat-yaml-*.jar):$(echo /pulsar/lib/org.yaml-snakeyaml-*.jar)" + maxPendingAsyncRequests *int32, logConfigFileName, instancePath, entryClass string) []string { + classPath := fmt.Sprintf("%s:$(echo /pulsar/lib/com.fasterxml.jackson.dataformat-jackson-dataformat-yaml-*.jar):$(echo /pulsar/lib/org.yaml-snakeyaml-*.jar)", instancePath) javaLogConfigPath := logConfigFileName if javaLogConfigPath == "" { javaLogConfigPath = DefaultJavaLogConfigPath @@ -1327,7 +1330,7 @@ func getProcessJavaRuntimeArgs(name, packageName, clusterName, logLevel, details "-XX:+ExitOnOutOfMemoryError", "-Xlog:gc*:file=/pulsar/logs/gc.log:time,level,tags:filecount=5,filesize=10M", strings.Join(javaOpts, " "), - "org.apache.pulsar.functions.instance.JavaInstanceMain", + entryClass, "--jar", packageName, } diff --git a/controllers/spec/function.go b/controllers/spec/function.go index 9eca5d0ca..fabee4f6d 100644 --- a/controllers/spec/function.go +++ b/controllers/spec/function.go @@ -237,6 +237,14 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { if spec.Java != nil { if spec.Java.Jar != "" { mountPath := extractMountPath(spec.Java.Jar) + instancePath := DefaultPulsarFunctionsJavaInstancePath + if spec.Java.InstancePath != nil && *spec.Java.InstancePath != "" { + instancePath = *spec.Java.InstancePath + } + entryClass := DefaultPulsarFunctionsJavaInstanceEntryClass + if spec.Java.EntryClass != nil && *spec.Java.EntryClass != "" { + entryClass = *spec.Java.EntryClass + } return MakeJavaFunctionCommand(spec.Java.JarLocation, mountPath, spec.Name, spec.ClusterName, GenerateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), @@ -250,7 +258,7 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, spec.MaxPendingAsyncRequests, - GenerateJavaLogConfigFileName(function.Spec.Java)) + GenerateJavaLogConfigFileName(function.Spec.Java), instancePath, entryClass) } } else if spec.Python != nil { if spec.Python.Py != "" { diff --git a/controllers/spec/sink.go b/controllers/spec/sink.go index 12200fc5a..9dc736673 100644 --- a/controllers/spec/sink.go +++ b/controllers/spec/sink.go @@ -227,6 +227,14 @@ func MakeSinkCommand(sink *v1alpha1.Sink) []string { hasWget = true } mountPath := extractMountPath(spec.Java.Jar) + instancePath := DefaultPulsarFunctionsJavaInstancePath + if spec.Java.InstancePath != nil && *spec.Java.InstancePath != "" { + instancePath = *spec.Java.InstancePath + } + entryClass := DefaultPulsarFunctionsJavaInstanceEntryClass + if spec.Java.EntryClass != nil && *spec.Java.EntryClass != "" { + entryClass = *spec.Java.EntryClass + } return MakeJavaFunctionCommand(spec.Java.JarLocation, mountPath, spec.Name, spec.ClusterName, GenerateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), @@ -238,7 +246,7 @@ func MakeSinkCommand(sink *v1alpha1.Sink) []string { spec.Resources.Limits.Memory(), spec.Java.JavaOpts, hasPulsarctl, hasWget, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, nil, - GenerateJavaLogConfigFileName(spec.Java)) + GenerateJavaLogConfigFileName(spec.Java), instancePath, entryClass) } func generateSinkDetailsInJSON(sink *v1alpha1.Sink) string { diff --git a/controllers/spec/source.go b/controllers/spec/source.go index 24d4cb33c..f756b084f 100644 --- a/controllers/spec/source.go +++ b/controllers/spec/source.go @@ -174,6 +174,14 @@ func makeSourceCommand(source *v1alpha1.Source) []string { } mountPath := extractMountPath(spec.Java.Jar) + instancePath := DefaultPulsarFunctionsJavaInstancePath + if spec.Java.InstancePath != nil && *spec.Java.InstancePath != "" { + instancePath = *spec.Java.InstancePath + } + entryClass := DefaultPulsarFunctionsJavaInstanceEntryClass + if spec.Java.EntryClass != nil && *spec.Java.EntryClass != "" { + entryClass = *spec.Java.EntryClass + } return MakeJavaFunctionCommand(spec.Java.JarLocation, mountPath, spec.Name, spec.ClusterName, GenerateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), @@ -185,7 +193,7 @@ func makeSourceCommand(source *v1alpha1.Source) []string { spec.Resources.Limits.Memory(), spec.Java.JavaOpts, hasPulsarctl, hasWget, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, nil, - GenerateJavaLogConfigFileName(spec.Java)) + GenerateJavaLogConfigFileName(spec.Java), instancePath, entryClass) } func generateSourceDetailsInJSON(source *v1alpha1.Source) string { diff --git a/manifests/crd.yaml b/manifests/crd.yaml index b71c7c542..18cc958d0 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -818,6 +818,8 @@ spec: items: type: string type: array + disableDefaultAffinity: + type: boolean env: items: properties: @@ -4357,6 +4359,8 @@ spec: items: type: string type: array + disableDefaultAffinity: + type: boolean env: items: properties: @@ -6938,8 +6942,42 @@ spec: type: object showPreciseParallelism: type: boolean + sinkConfig: + properties: + className: + type: string + configs: + type: object + x-kubernetes-preserve-unknown-fields: true + sinkType: + type: string + typeClassName: + type: string + type: object skipToLatest: type: boolean + sourceConfig: + properties: + batchSourceConfig: + properties: + discoveryTriggererClassName: + type: string + discoveryTriggererConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + required: + - discoveryTriggererClassName + type: object + className: + type: string + configs: + type: object + x-kubernetes-preserve-unknown-fields: true + sourceType: + type: string + typeClassName: + type: string + type: object statefulConfig: properties: pulsar: @@ -8234,6 +8272,8 @@ spec: items: type: string type: array + disableDefaultAffinity: + type: boolean env: items: properties: @@ -10816,21 +10856,8 @@ spec: showPreciseParallelism: type: boolean sinkConfig: - properties: - archive: - type: string - builtin: - type: string - className: - type: string - configs: - additionalProperties: {} - type: object - sinkType: - type: string - typeClassName: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string statefulConfig: @@ -11905,6 +11932,8 @@ spec: items: type: string type: array + disableDefaultAffinity: + type: boolean env: items: properties: @@ -14483,19 +14512,8 @@ spec: showPreciseParallelism: type: boolean sourceConfig: - properties: - archive: - type: string - builtin: - type: string - className: - type: string - configs: - additionalProperties: {} - type: object - typeClassName: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string statefulConfig: @@ -15736,6 +15754,8 @@ spec: items: type: string type: array + disableDefaultAffinity: + type: boolean env: items: properties: @@ -18317,8 +18337,42 @@ spec: type: object showPreciseParallelism: type: boolean + sinkConfig: + properties: + className: + type: string + configs: + type: object + x-kubernetes-preserve-unknown-fields: true + sinkType: + type: string + typeClassName: + type: string + type: object skipToLatest: type: boolean + sourceConfig: + properties: + batchSourceConfig: + properties: + discoveryTriggererClassName: + type: string + discoveryTriggererConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + required: + - discoveryTriggererClassName + type: object + className: + type: string + configs: + type: object + x-kubernetes-preserve-unknown-fields: true + sourceType: + type: string + typeClassName: + type: string + type: object statefulConfig: properties: pulsar: @@ -19692,6 +19746,8 @@ spec: items: type: string type: array + disableDefaultAffinity: + type: boolean env: items: properties: @@ -22274,21 +22330,8 @@ spec: showPreciseParallelism: type: boolean sinkConfig: - properties: - archive: - type: string - builtin: - type: string - className: - type: string - configs: - additionalProperties: {} - type: object - sinkType: - type: string - typeClassName: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string statefulConfig: @@ -23442,6 +23485,8 @@ spec: items: type: string type: array + disableDefaultAffinity: + type: boolean env: items: properties: @@ -26020,19 +26065,8 @@ spec: showPreciseParallelism: type: boolean sourceConfig: - properties: - archive: - type: string - builtin: - type: string - className: - type: string - configs: - additionalProperties: {} - type: object - typeClassName: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string statefulConfig: diff --git a/manifests/rbac.yaml b/manifests/rbac.yaml index 0a3848599..87ee01513 100644 --- a/manifests/rbac.yaml +++ b/manifests/rbac.yaml @@ -40,7 +40,6 @@ rules: apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - creationTimestamp: null name: manager-role rules: - apiGroups: