From 74f53001f98aad476393c4ee5052170d05fb4694 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Thu, 25 Sep 2025 19:09:02 +0200 Subject: [PATCH 01/11] init --- spring-cloud-aws-autoconfigure/pom.xml | 5 +++ .../kinesis/KinesisAutoConfiguration.java | 38 +++++++++++++++++++ .../kinesis/KinesisProperties.java | 19 ++++++++++ .../spring-cloud-aws-starter-kinesis/pom.xml | 29 ++++++++++++++ 4 files changed, 91 insertions(+) create mode 100644 spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java create mode 100644 spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java create mode 100644 spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml diff --git a/spring-cloud-aws-autoconfigure/pom.xml b/spring-cloud-aws-autoconfigure/pom.xml index a0eee3ee5..5c58624cd 100644 --- a/spring-cloud-aws-autoconfigure/pom.xml +++ b/spring-cloud-aws-autoconfigure/pom.xml @@ -181,6 +181,11 @@ kms true + + software.amazon.awssdk + kinesis + true + software.amazon.encryption.s3 amazon-s3-encryption-client-java diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java new file mode 100644 index 000000000..ea5198b7a --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java @@ -0,0 +1,38 @@ +package io.awspring.cloud.autoconfigure.kinesis; + + +import io.awspring.cloud.autoconfigure.AwsSyncClientCustomizer; +import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer; +import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.sns.SnsClientCustomizer; +import io.awspring.cloud.autoconfigure.sns.SnsProperties; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; + +@AutoConfiguration +@ConditionalOnClass({ KinesisAsyncClient.class}) +@EnableConfigurationProperties({ KinesisProperties.class }) +@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class }) +@ConditionalOnProperty( value= "spring.cloud.aws.kinesis.enabled", havingValue = "true", matchIfMissing = true) +public class KinesisAutoConfiguration { + + @ConditionalOnMissingBean + @Bean + public KinesisAsyncClient kinesisAsyncClient(SnsProperties properties, AwsClientBuilderConfigurer awsClientBuilderConfigurer, + ObjectProvider connectionDetails, + ObjectProvider snsClientCustomizers, + ObjectProvider awsSyncClientCustomizers) { + return awsClientBuilderConfigurer + .configureSyncClient(KinesisAsyncClient.builder(), properties, connectionDetails.getIfAvailable(), + snsClientCustomizers.orderedStream(), awsSyncClientCustomizers.orderedStream()) + .build(); + } +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java new file mode 100644 index 000000000..3cd5a2efc --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java @@ -0,0 +1,19 @@ +package io.awspring.cloud.autoconfigure.kinesis; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import static io.awspring.cloud.autoconfigure.kinesis.KinesisProperties.PREFIX; + +/** + * Properties related to KinesisClient + * + * @author Matej Nedic + * @since 4.0.0 + */ +@ConfigurationProperties(prefix = PREFIX) +public class KinesisProperties { + /** + * The prefix used for AWS Kinesis configuration. + */ + public static final String PREFIX = "spring.cloud.aws.kinesis"; +} diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml new file mode 100644 index 000000000..a9f02f837 --- /dev/null +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml @@ -0,0 +1,29 @@ + + + + + spring-cloud-aws + io.awspring.cloud + 4.0.0-SNAPSHOT + ../../pom.xml + + + 4.0.0 + + spring-cloud-aws-starter-kinesis + Spring Cloud AWS Kinesis Starter + Spring Cloud AWS Kinesis Starter + + + + io.awspring.cloud + spring-cloud-aws-starter + + + software.amazon.awssdk + kinesis + + + From dceac870e9fb05b0904bd0355e16ee4e10ee52c9 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Thu, 25 Sep 2025 19:24:22 +0200 Subject: [PATCH 02/11] amend --- .../kinesis/KinesisAsyncClientCustomizer.java | 28 +++++++++++++++++++ .../kinesis/KinesisAutoConfiguration.java | 17 ++++++----- .../kinesis/KinesisProperties.java | 3 +- ...ot.autoconfigure.AutoConfiguration.imports | 1 + 4 files changed, 39 insertions(+), 10 deletions(-) create mode 100644 spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java new file mode 100644 index 000000000..953f5866f --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java @@ -0,0 +1,28 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import io.awspring.cloud.autoconfigure.AwsClientCustomizer; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; +/** + * Callback interface that can be used to customize a {@link KinesisAsyncClientBuilder}. + * + * @author Matej Nedic + * @since 4.0.0 + */ +@FunctionalInterface +public interface KinesisAsyncClientCustomizer extends AwsClientCustomizer { +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java index ea5198b7a..e3d33dd84 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java @@ -1,13 +1,11 @@ package io.awspring.cloud.autoconfigure.kinesis; -import io.awspring.cloud.autoconfigure.AwsSyncClientCustomizer; +import io.awspring.cloud.autoconfigure.AwsAsyncClientCustomizer; import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer; import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; -import io.awspring.cloud.autoconfigure.sns.SnsClientCustomizer; -import io.awspring.cloud.autoconfigure.sns.SnsProperties; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfigureAfter; @@ -16,6 +14,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @AutoConfiguration @ConditionalOnClass({ KinesisAsyncClient.class}) @@ -26,13 +25,13 @@ public class KinesisAutoConfiguration { @ConditionalOnMissingBean @Bean - public KinesisAsyncClient kinesisAsyncClient(SnsProperties properties, AwsClientBuilderConfigurer awsClientBuilderConfigurer, - ObjectProvider connectionDetails, - ObjectProvider snsClientCustomizers, - ObjectProvider awsSyncClientCustomizers) { + public KinesisAsyncClient kinesisAsyncClient(KinesisProperties properties, AwsClientBuilderConfigurer awsClientBuilderConfigurer, + ObjectProvider connectionDetails, + ObjectProvider kinesisAsyncClientCustomizer, + ObjectProvider awsSyncClientCustomizers) { return awsClientBuilderConfigurer - .configureSyncClient(KinesisAsyncClient.builder(), properties, connectionDetails.getIfAvailable(), - snsClientCustomizers.orderedStream(), awsSyncClientCustomizers.orderedStream()) + .configureAsyncClient(KinesisAsyncClient.builder(), properties, connectionDetails.getIfAvailable(), + kinesisAsyncClientCustomizer.orderedStream(), awsSyncClientCustomizers.orderedStream()) .build(); } } diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java index 3cd5a2efc..4f09d97b6 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java @@ -1,5 +1,6 @@ package io.awspring.cloud.autoconfigure.kinesis; +import io.awspring.cloud.autoconfigure.AwsClientProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import static io.awspring.cloud.autoconfigure.kinesis.KinesisProperties.PREFIX; @@ -11,7 +12,7 @@ * @since 4.0.0 */ @ConfigurationProperties(prefix = PREFIX) -public class KinesisProperties { +public class KinesisProperties extends AwsClientProperties { /** * The prefix used for AWS Kinesis configuration. */ diff --git a/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 657cbf245..36bdffad1 100644 --- a/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -15,3 +15,4 @@ io.awspring.cloud.autoconfigure.config.secretsmanager.SecretsManagerAutoConfigur io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreReloadAutoConfiguration io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreAutoConfiguration io.awspring.cloud.autoconfigure.config.s3.S3ReloadAutoConfiguration +io.awspring.cloud.autoconfigure.kinesis.KinesisAutoConfiguration From 239f64542650a27d82325b524c9a5559656227d6 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Thu, 25 Sep 2025 22:41:07 +0200 Subject: [PATCH 03/11] kinesis client --- .../kinesis/KinesisAutoConfigurationTest.java | 46 +++++++++++++++ .../kinesis/KinesisClientCustomizerTests.java | 58 +++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java create mode 100644 spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java new file mode 100644 index 000000000..6dafa0dc3 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java @@ -0,0 +1,46 @@ +package io.awspring.cloud.autoconfigure.kinesis; + + +import io.awspring.cloud.autoconfigure.ConfiguredAwsClient; +import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; + +import java.net.URI; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link KinesisAutoConfiguration}. + * + * @author Matej Nedic + */ +class KinesisAutoConfigurationTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withPropertyValues("spring.cloud.aws.region.static:eu-west-1", + "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop") + .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class, + CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class)); + + + @Test + void disableKinesisIntegration() { + this.contextRunner.withPropertyValues("spring.cloud.aws.kinesis.enabled:false").run(context -> { + assertThat(context).doesNotHaveBean(KinesisAsyncClient.class); + }); + } + + @Test + void withCustomEndpoint() { + this.contextRunner.withPropertyValues("spring.cloud.aws.kinesis.endpoint:http://localhost:8090").run(context -> { + ConfiguredAwsClient client = new ConfiguredAwsClient(context.getBean(KinesisAsyncClient.class)); + assertThat(client.getEndpoint()).isEqualTo(URI.create("http://localhost:8090")); + assertThat(client.isEndpointOverridden()).isTrue(); + }); + } +} diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java new file mode 100644 index 000000000..371f010c7 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java @@ -0,0 +1,58 @@ +package io.awspring.cloud.autoconfigure.kinesis; + +import io.awspring.cloud.autoconfigure.ConfiguredAwsClient; +import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +public class KinesisClientCustomizerTests { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withPropertyValues("spring.cloud.aws.region.static:eu-west-1", + "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop") + .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class, + CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class)); + + @Test + void customKinesisClientCustomizer() { + contextRunner.withUserConfiguration(KinesisClientCustomizerTests.CustomizerConfig.class).run(context -> { + ConfiguredAwsClient kinesisAsyncClient = new ConfiguredAwsClient(context.getBean(KinesisAsyncClient.class)); + assertThat(kinesisAsyncClient.getApiCallTimeout()).describedAs("sets property from first customizer") + .isEqualTo(Duration.ofMillis(2001)); + assertThat(kinesisAsyncClient.getApiCallAttemptTimeout()).describedAs("sets property from second customizer") + .isEqualTo(Duration.ofMillis(2002)); + }); + } + + @Configuration(proxyBeanMethods = false) + static class CustomizerConfig { + + @Bean + KinesisAsyncClientCustomizer kinesisClientCustomizer() { + return builder -> { + builder.overrideConfiguration(builder.overrideConfiguration().copy(c -> { + c.apiCallTimeout(Duration.ofMillis(2001)); + })); + }; + } + + @Bean + KinesisAsyncClientCustomizer kinesisClientCustomizer2() { + return builder -> { + builder.overrideConfiguration(builder.overrideConfiguration().copy(c -> { + c.apiCallAttemptTimeout(Duration.ofMillis(2002)); + })); + }; + } + } +} From 93a3d8113fc75c8bb25b2bc2a945ebb5f43a6579 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Thu, 25 Sep 2025 22:42:00 +0200 Subject: [PATCH 04/11] integration --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index 96538ab75..b837ab028 100644 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,7 @@ spring-cloud-aws-starters/spring-cloud-aws-starter-ses spring-cloud-aws-starters/spring-cloud-aws-starter-sns spring-cloud-aws-starters/spring-cloud-aws-starter-sqs + spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis spring-cloud-aws-samples spring-cloud-aws-test spring-cloud-aws-modulith From 609fa6bab39169851083beb66cb7f9e4218a71d7 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Thu, 2 Oct 2025 08:29:51 +0200 Subject: [PATCH 05/11] Apply Spotless --- spring-cloud-aws-autoconfigure/pom.xml | 10 +++++ .../kinesis/KinesisAsyncClientCustomizer.java | 1 + .../kinesis/KinesisAutoConfiguration.java | 35 +++++++++++----- .../kinesis/KinesisProperties.java | 19 ++++++++- .../kinesis/KinesisAutoConfigurationTest.java | 41 ++++++++++++------- .../kinesis/KinesisClientCustomizerTests.java | 36 +++++++++++----- spring-cloud-aws-dependencies/pom.xml | 16 ++++++++ .../spring-cloud-aws-starter-kinesis/pom.xml | 8 ++++ 8 files changed, 129 insertions(+), 37 deletions(-) diff --git a/spring-cloud-aws-autoconfigure/pom.xml b/spring-cloud-aws-autoconfigure/pom.xml index 5c58624cd..c1213fca2 100644 --- a/spring-cloud-aws-autoconfigure/pom.xml +++ b/spring-cloud-aws-autoconfigure/pom.xml @@ -181,6 +181,16 @@ kms true + + software.amazon.kinesis + amazon-kinesis-client + true + + + com.amazonaws + amazon-kinesis-producer + true + software.amazon.awssdk kinesis diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java index 953f5866f..2c16d1aa0 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java @@ -17,6 +17,7 @@ import io.awspring.cloud.autoconfigure.AwsClientCustomizer; import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; + /** * Callback interface that can be used to customize a {@link KinesisAsyncClientBuilder}. * diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java index e3d33dd84..2a221a3b3 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java @@ -1,6 +1,20 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.awspring.cloud.autoconfigure.kinesis; - import io.awspring.cloud.autoconfigure.AwsAsyncClientCustomizer; import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer; import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; @@ -17,21 +31,22 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @AutoConfiguration -@ConditionalOnClass({ KinesisAsyncClient.class}) +@ConditionalOnClass({ KinesisAsyncClient.class }) @EnableConfigurationProperties({ KinesisProperties.class }) @AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class }) -@ConditionalOnProperty( value= "spring.cloud.aws.kinesis.enabled", havingValue = "true", matchIfMissing = true) +@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.enabled", havingValue = "true", matchIfMissing = true) public class KinesisAutoConfiguration { @ConditionalOnMissingBean @Bean - public KinesisAsyncClient kinesisAsyncClient(KinesisProperties properties, AwsClientBuilderConfigurer awsClientBuilderConfigurer, - ObjectProvider connectionDetails, - ObjectProvider kinesisAsyncClientCustomizer, - ObjectProvider awsSyncClientCustomizers) { + public KinesisAsyncClient kinesisAsyncClient(KinesisProperties properties, + AwsClientBuilderConfigurer awsClientBuilderConfigurer, + ObjectProvider connectionDetails, + ObjectProvider kinesisAsyncClientCustomizer, + ObjectProvider awsSyncClientCustomizers) { return awsClientBuilderConfigurer - .configureAsyncClient(KinesisAsyncClient.builder(), properties, connectionDetails.getIfAvailable(), - kinesisAsyncClientCustomizer.orderedStream(), awsSyncClientCustomizers.orderedStream()) - .build(); + .configureAsyncClient(KinesisAsyncClient.builder(), properties, connectionDetails.getIfAvailable(), + kinesisAsyncClientCustomizer.orderedStream(), awsSyncClientCustomizers.orderedStream()) + .build(); } } diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java index 4f09d97b6..a809a0e18 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java @@ -1,10 +1,25 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.awspring.cloud.autoconfigure.kinesis; +import static io.awspring.cloud.autoconfigure.kinesis.KinesisProperties.PREFIX; + import io.awspring.cloud.autoconfigure.AwsClientProperties; import org.springframework.boot.context.properties.ConfigurationProperties; -import static io.awspring.cloud.autoconfigure.kinesis.KinesisProperties.PREFIX; - /** * Properties related to KinesisClient * diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java index 6dafa0dc3..03fdbf00e 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java @@ -1,19 +1,32 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.awspring.cloud.autoconfigure.kinesis; +import static org.assertj.core.api.Assertions.assertThat; import io.awspring.cloud.autoconfigure.ConfiguredAwsClient; import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration; import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import java.net.URI; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import java.net.URI; - -import static org.assertj.core.api.Assertions.assertThat; - /** * Tests for {@link KinesisAutoConfiguration}. * @@ -22,11 +35,10 @@ class KinesisAutoConfigurationTest { private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() - .withPropertyValues("spring.cloud.aws.region.static:eu-west-1", - "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop") - .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class, - CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class)); - + .withPropertyValues("spring.cloud.aws.region.static:eu-west-1", + "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop") + .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class, + CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class)); @Test void disableKinesisIntegration() { @@ -37,10 +49,11 @@ void disableKinesisIntegration() { @Test void withCustomEndpoint() { - this.contextRunner.withPropertyValues("spring.cloud.aws.kinesis.endpoint:http://localhost:8090").run(context -> { - ConfiguredAwsClient client = new ConfiguredAwsClient(context.getBean(KinesisAsyncClient.class)); - assertThat(client.getEndpoint()).isEqualTo(URI.create("http://localhost:8090")); - assertThat(client.isEndpointOverridden()).isTrue(); - }); + this.contextRunner.withPropertyValues("spring.cloud.aws.kinesis.endpoint:http://localhost:8090") + .run(context -> { + ConfiguredAwsClient client = new ConfiguredAwsClient(context.getBean(KinesisAsyncClient.class)); + assertThat(client.getEndpoint()).isEqualTo(URI.create("http://localhost:8090")); + assertThat(client.isEndpointOverridden()).isTrue(); + }); } } diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java index 371f010c7..a4043f14f 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java @@ -1,9 +1,27 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.awspring.cloud.autoconfigure.kinesis; +import static org.assertj.core.api.Assertions.assertThat; + import io.awspring.cloud.autoconfigure.ConfiguredAwsClient; import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration; import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import java.time.Duration; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; @@ -11,26 +29,22 @@ import org.springframework.context.annotation.Configuration; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import java.time.Duration; - -import static org.assertj.core.api.Assertions.assertThat; - public class KinesisClientCustomizerTests { private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() - .withPropertyValues("spring.cloud.aws.region.static:eu-west-1", - "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop") - .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class, - CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class)); + .withPropertyValues("spring.cloud.aws.region.static:eu-west-1", + "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop") + .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class, + CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class)); @Test void customKinesisClientCustomizer() { contextRunner.withUserConfiguration(KinesisClientCustomizerTests.CustomizerConfig.class).run(context -> { ConfiguredAwsClient kinesisAsyncClient = new ConfiguredAwsClient(context.getBean(KinesisAsyncClient.class)); assertThat(kinesisAsyncClient.getApiCallTimeout()).describedAs("sets property from first customizer") - .isEqualTo(Duration.ofMillis(2001)); - assertThat(kinesisAsyncClient.getApiCallAttemptTimeout()).describedAs("sets property from second customizer") - .isEqualTo(Duration.ofMillis(2002)); + .isEqualTo(Duration.ofMillis(2001)); + assertThat(kinesisAsyncClient.getApiCallAttemptTimeout()) + .describedAs("sets property from second customizer").isEqualTo(Duration.ofMillis(2002)); }); } diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml index df111f992..1d5b4ca6d 100644 --- a/spring-cloud-aws-dependencies/pom.xml +++ b/spring-cloud-aws-dependencies/pom.xml @@ -33,6 +33,8 @@ 2.0.3 2.0.0-M2 2.3.0 + 2.7.0 + 0.15.12 @@ -68,6 +70,13 @@ true + + software.amazon.kinesis + amazon-kinesis-client + ${amazon.kinesis.client} + true + + software.amazon.encryption.s3 amazon-s3-encryption-client-java @@ -75,6 +84,13 @@ true + + com.amazonaws + amazon-kinesis-producer + ${amazon.kinesis.producer} + true + + software.amazon.s3.accessgrants aws-s3-accessgrants-java-plugin diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml index a9f02f837..ac9bd3a8e 100644 --- a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml @@ -25,5 +25,13 @@ software.amazon.awssdk kinesis + + software.amazon.kinesis + amazon-kinesis-client + + + com.amazonaws + amazon-kinesis-producer + From ad39096cceb6c468ba257e6f8e584886582d68c0 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Sun, 19 Oct 2025 14:34:11 +0200 Subject: [PATCH 06/11] Add KinesisProducer --- .../kinesis/KinesisAutoConfiguration.java | 70 +++ .../kinesis/KinesisProducerProperties.java | 462 ++++++++++++++++++ .../kinesis/KinesisProperties.java | 12 + 3 files changed, 544 insertions(+) create mode 100644 spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java index 2a221a3b3..2ab8613c1 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java @@ -15,6 +15,8 @@ */ package io.awspring.cloud.autoconfigure.kinesis; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; import io.awspring.cloud.autoconfigure.AwsAsyncClientCustomizer; import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer; import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; @@ -27,7 +29,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.regions.providers.AwsRegionProvider; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @AutoConfiguration @@ -49,4 +54,69 @@ public KinesisAsyncClient kinesisAsyncClient(KinesisProperties properties, kinesisAsyncClientCustomizer.orderedStream(), awsSyncClientCustomizers.orderedStream()) .build(); } + + @ConditionalOnClass(name = { "com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration", + "com.amazonaws.services.kinesis.producer.KinesisProducer" }) + @Configuration + public static class KinesisKplAndKclConfiguration { + + @ConditionalOnMissingBean + @Bean + public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProperties kinesisProperties, + AwsRegionProvider awsRegionProvider, ObjectProvider connectionDetails) { + PropertyMapper propertyMapper = PropertyMapper.get(); + KinesisProducerConfiguration config = new KinesisProducerConfiguration(); + KinesisProducerProperties prop = kinesisProperties.getProducer(); + propertyMapper.from(prop::getAggregationEnabled).whenNonNull().to(config::setAggregationEnabled); + propertyMapper.from(prop::getAggregationMaxCount).whenNonNull().to(config::setAggregationMaxCount); + propertyMapper.from(prop::getAggregationMaxSize).whenNonNull().to(config::setAggregationMaxSize); + propertyMapper.from(prop::getCloudwatchEndpoint).whenHasText().to(config::setCloudwatchEndpoint); + propertyMapper.from(prop::getCloudwatchPort).whenNonNull().to(config::setCloudwatchPort); + propertyMapper.from(prop::getCollectionMaxCount).whenNonNull().to(config::setCollectionMaxCount); + propertyMapper.from(prop::getCollectionMaxSize).whenNonNull().to(config::setCollectionMaxSize); + propertyMapper.from(prop::getConnectTimeout).whenNonNull().to(config::setConnectTimeout); + propertyMapper.from(prop::getCredentialsRefreshDelay).whenNonNull().to(config::setCredentialsRefreshDelay); + propertyMapper.from(prop::getEnableCoreDumps).whenNonNull().to(config::setEnableCoreDumps); + propertyMapper.from(prop::getFailIfThrottled).whenNonNull().to(config::setFailIfThrottled); + propertyMapper.from(prop::getLogLevel).whenHasText().to(config::setLogLevel); + propertyMapper.from(prop::getMaxConnections).whenNonNull().to(config::setMaxConnections); + propertyMapper.from(prop::getMetricsGranularity).whenHasText().to(config::setMetricsGranularity); + propertyMapper.from(prop::getMetricsLevel).whenHasText().to(config::setMetricsLevel); + propertyMapper.from(prop::getMetricsNamespace).whenHasText().to(config::setMetricsNamespace); + propertyMapper.from(prop::getMetricsUploadDelay).whenNonNull().to(config::setMetricsUploadDelay); + propertyMapper.from(prop::getMinConnections).whenNonNull().to(config::setMinConnections); + propertyMapper.from(prop::getNativeExecutable).whenNonNull().to(config::setNativeExecutable); + propertyMapper.from(prop::getRateLimit).whenNonNull().to(config::setRateLimit); + propertyMapper.from(prop::getRecordMaxBufferedTime).whenNonNull().to(config::setRecordMaxBufferedTime); + propertyMapper.from(prop::getRecordTtl).whenNonNull().to(config::setRecordTtl); + propertyMapper.from(prop::getRequestTimeout).whenNonNull().to(config::setRequestTimeout); + propertyMapper.from(prop::getTempDirectory).whenNonNull().to(config::setTempDirectory); + propertyMapper.from(prop::getVerifyCertificate).whenNonNull().to(config::setVerifyCertificate); + propertyMapper.from(prop.getProxyHost()).whenNonNull().to(config::setProxyHost); + propertyMapper.from(prop.getProxyPort()).whenNonNull().to(config::setProxyPort); + propertyMapper.from(prop.getProxyUserName()).whenHasText().to(config::setProxyUserName); + propertyMapper.from(prop.getProxyPassword()).whenHasText().to(config::setProxyPassword); + propertyMapper.from(prop.getStsEndpoint()).whenHasText().to(config::setStsEndpoint); + propertyMapper.from(prop.getStsPort()).whenNonNull().to(config::setStsPort); + propertyMapper.from(prop.getThreadingModel()).whenNonNull().to(config::setThreadingModel); + propertyMapper.from(prop.getThreadPoolSize()).whenNonNull().to(config::setThreadPoolSize); + propertyMapper.from(prop.getUserRecordTimeoutInMillis()).whenNonNull() + .to(config::setUserRecordTimeoutInMillis); + + config.setRegion(AwsClientBuilderConfigurer + .resolveRegion(kinesisProperties, connectionDetails.getIfAvailable(), awsRegionProvider) + .toString()); + connectionDetails.ifAvailable(cd -> { + config.setKinesisPort(cd.getEndpoint().getPort()); + config.setKinesisEndpoint(cd.getEndpoint().getHost()); + }); + return config; + } + + @ConditionalOnMissingBean + @Bean + public KinesisProducer kinesisProducer(KinesisProducerConfiguration kinesisProducerConfiguration) { + return new KinesisProducer(kinesisProducerConfiguration); + } + } } diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java new file mode 100644 index 000000000..646760854 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java @@ -0,0 +1,462 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; + +public class KinesisProducerProperties { + /** + * Whether aggregation of user records is enabled. + */ + private Boolean aggregationEnabled; + + /** + * Maximum number of user records to aggregate. Must be between 1 and Long.MAX_VALUE. + */ + private Long aggregationMaxCount; + + /** + * Maximum size in bytes of aggregated records. Must be between 64 and 1048576. + */ + private Long aggregationMaxSize; + + /** + * CloudWatch endpoint. Must match the regex: ^([A-Za-z0-9-\\.]+)?$ + */ + private String cloudwatchEndpoint; + + /** + * CloudWatch port. Must be between 1 and 65535. + */ + private Long cloudwatchPort; + + /** + * Maximum number of records to collect before sending a batch. Must be between 1 and 500. + */ + private Long collectionMaxCount; + + /** + * Maximum size in bytes of collected records before sending. Must be between 52224 and Long.MAX_VALUE. + */ + private Long collectionMaxSize; + + /** + * Connection timeout in milliseconds. Must be between 100 and 300000. + */ + private Long connectTimeout; + + /** + * Delay in milliseconds for credentials refresh. Must be between 1 and 300000. + */ + private Long credentialsRefreshDelay; + + /** + * Whether core dumps are enabled. + */ + private Boolean enableCoreDumps; + + /** + * Whether to fail if throttled by Kinesis. + */ + private Boolean failIfThrottled; + + /** + * Log level for the producer. Allowed values: trace, debug, info, warning, error. + */ + private String logLevel; + + /** + * Maximum number of connections. Must be between 1 and 256. + */ + private Long maxConnections; + + /** + * Metrics granularity. Allowed values: global, stream, shard. + */ + private String metricsGranularity; + + /** + * Metrics level. Allowed values: none, summary, detailed. + */ + private String metricsLevel; + + /** + * Metrics namespace. Must match the regex: (?!AWS/).{1,255} + */ + private String metricsNamespace; + + /** + * Delay in milliseconds for uploading metrics. Must be between 1 and 60000. + */ + private Long metricsUploadDelay; + + /** + * Minimum number of connections. Must be between 1 and 16. + */ + private Long minConnections; + + /** + * Native executable path. + */ + private String nativeExecutable; + + /** + * Rate limit for records per second. Must be between 1 and Long.MAX_VALUE. + */ + private Long rateLimit; + + /** + * Maximum buffered time for records in milliseconds. Must be between 0 and Long.MAX_VALUE. + */ + private Long recordMaxBufferedTime; + + /** + * Time to live for records in milliseconds. Must be between 100 and Long.MAX_VALUE. + */ + private Long recordTtl; + + /** + * Request timeout in milliseconds. Must be between 100 and 600000. + */ + private Long requestTimeout; + + /** + * Temporary directory path for the producer. + */ + private String tempDirectory; + + /** + * Whether to verify SSL certificates. + */ + private Boolean verifyCertificate; + + /** + * Proxy host. + */ + private String proxyHost; + + /** + * Proxy port. Must be between 1 and 65535. + */ + private Long proxyPort; + + /** + * Proxy username. + */ + private String proxyUserName; + + /** + * Proxy password. + */ + private String proxyPassword; + + /** + * STS endpoint. Must match the regex: ^([A-Za-z0-9-\\.]+)?$ + */ + private String stsEndpoint; + + /** + * STS port. Must be between 1 and 65535. + */ + private Long stsPort; + + /** + * Threading model for the producer. + */ + private KinesisProducerConfiguration.ThreadingModel threadingModel; + + /** + * Thread pool size. Must be greater than or equal to 0. + */ + private Integer threadPoolSize; + + /** + * Timeout in milliseconds for user records. Must be greater than or equal to 0. + */ + private Long userRecordTimeoutInMillis; + + public Boolean getAggregationEnabled() { + return aggregationEnabled; + } + + public void setAggregationEnabled(Boolean aggregationEnabled) { + this.aggregationEnabled = aggregationEnabled; + } + + public Long getAggregationMaxCount() { + return aggregationMaxCount; + } + + public void setAggregationMaxCount(Long aggregationMaxCount) { + this.aggregationMaxCount = aggregationMaxCount; + } + + public Long getAggregationMaxSize() { + return aggregationMaxSize; + } + + public void setAggregationMaxSize(Long aggregationMaxSize) { + this.aggregationMaxSize = aggregationMaxSize; + } + + public String getCloudwatchEndpoint() { + return cloudwatchEndpoint; + } + + public void setCloudwatchEndpoint(String cloudwatchEndpoint) { + this.cloudwatchEndpoint = cloudwatchEndpoint; + } + + public Long getCloudwatchPort() { + return cloudwatchPort; + } + + public void setCloudwatchPort(Long cloudwatchPort) { + this.cloudwatchPort = cloudwatchPort; + } + + public Long getCollectionMaxCount() { + return collectionMaxCount; + } + + public void setCollectionMaxCount(Long collectionMaxCount) { + this.collectionMaxCount = collectionMaxCount; + } + + public Long getCollectionMaxSize() { + return collectionMaxSize; + } + + public void setCollectionMaxSize(Long collectionMaxSize) { + this.collectionMaxSize = collectionMaxSize; + } + + public Long getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(Long connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public Long getCredentialsRefreshDelay() { + return credentialsRefreshDelay; + } + + public void setCredentialsRefreshDelay(Long credentialsRefreshDelay) { + this.credentialsRefreshDelay = credentialsRefreshDelay; + } + + public Boolean getEnableCoreDumps() { + return enableCoreDumps; + } + + public void setEnableCoreDumps(Boolean enableCoreDumps) { + this.enableCoreDumps = enableCoreDumps; + } + + public Boolean getFailIfThrottled() { + return failIfThrottled; + } + + public void setFailIfThrottled(Boolean failIfThrottled) { + this.failIfThrottled = failIfThrottled; + } + + public String getLogLevel() { + return logLevel; + } + + public void setLogLevel(String logLevel) { + this.logLevel = logLevel; + } + + public Long getMaxConnections() { + return maxConnections; + } + + public void setMaxConnections(Long maxConnections) { + this.maxConnections = maxConnections; + } + + public String getMetricsGranularity() { + return metricsGranularity; + } + + public void setMetricsGranularity(String metricsGranularity) { + this.metricsGranularity = metricsGranularity; + } + + public String getMetricsLevel() { + return metricsLevel; + } + + public void setMetricsLevel(String metricsLevel) { + this.metricsLevel = metricsLevel; + } + + public String getMetricsNamespace() { + return metricsNamespace; + } + + public void setMetricsNamespace(String metricsNamespace) { + this.metricsNamespace = metricsNamespace; + } + + public Long getMetricsUploadDelay() { + return metricsUploadDelay; + } + + public void setMetricsUploadDelay(Long metricsUploadDelay) { + this.metricsUploadDelay = metricsUploadDelay; + } + + public Long getMinConnections() { + return minConnections; + } + + public void setMinConnections(Long minConnections) { + this.minConnections = minConnections; + } + + public String getNativeExecutable() { + return nativeExecutable; + } + + public void setNativeExecutable(String nativeExecutable) { + this.nativeExecutable = nativeExecutable; + } + + public Long getRateLimit() { + return rateLimit; + } + + public void setRateLimit(Long rateLimit) { + this.rateLimit = rateLimit; + } + + public Long getRecordMaxBufferedTime() { + return recordMaxBufferedTime; + } + + public void setRecordMaxBufferedTime(Long recordMaxBufferedTime) { + this.recordMaxBufferedTime = recordMaxBufferedTime; + } + + public Long getRecordTtl() { + return recordTtl; + } + + public void setRecordTtl(Long recordTtl) { + this.recordTtl = recordTtl; + } + + public Long getRequestTimeout() { + return requestTimeout; + } + + public void setRequestTimeout(Long requestTimeout) { + this.requestTimeout = requestTimeout; + } + + public String getTempDirectory() { + return tempDirectory; + } + + public void setTempDirectory(String tempDirectory) { + this.tempDirectory = tempDirectory; + } + + public Boolean getVerifyCertificate() { + return verifyCertificate; + } + + public void setVerifyCertificate(Boolean verifyCertificate) { + this.verifyCertificate = verifyCertificate; + } + + public String getProxyHost() { + return proxyHost; + } + + public void setProxyHost(String proxyHost) { + this.proxyHost = proxyHost; + } + + public Long getProxyPort() { + return proxyPort; + } + + public void setProxyPort(Long proxyPort) { + this.proxyPort = proxyPort; + } + + public String getProxyUserName() { + return proxyUserName; + } + + public void setProxyUserName(String proxyUserName) { + this.proxyUserName = proxyUserName; + } + + public String getProxyPassword() { + return proxyPassword; + } + + public void setProxyPassword(String proxyPassword) { + this.proxyPassword = proxyPassword; + } + + public String getStsEndpoint() { + return stsEndpoint; + } + + public void setStsEndpoint(String stsEndpoint) { + this.stsEndpoint = stsEndpoint; + } + + public Long getStsPort() { + return stsPort; + } + + public void setStsPort(Long stsPort) { + this.stsPort = stsPort; + } + + public KinesisProducerConfiguration.ThreadingModel getThreadingModel() { + return threadingModel; + } + + public void setThreadingModel(KinesisProducerConfiguration.ThreadingModel threadingModel) { + this.threadingModel = threadingModel; + } + + public Integer getThreadPoolSize() { + return threadPoolSize; + } + + public void setThreadPoolSize(Integer threadPoolSize) { + this.threadPoolSize = threadPoolSize; + } + + public Long getUserRecordTimeoutInMillis() { + return userRecordTimeoutInMillis; + } + + public void setUserRecordTimeoutInMillis(Long userRecordTimeoutInMillis) { + this.userRecordTimeoutInMillis = userRecordTimeoutInMillis; + } +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java index a809a0e18..73c5cc163 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java @@ -19,6 +19,7 @@ import io.awspring.cloud.autoconfigure.AwsClientProperties; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.lang.Nullable; /** * Properties related to KinesisClient @@ -32,4 +33,15 @@ public class KinesisProperties extends AwsClientProperties { * The prefix used for AWS Kinesis configuration. */ public static final String PREFIX = "spring.cloud.aws.kinesis"; + + @Nullable + private KinesisProducerProperties producer = new KinesisProducerProperties(); + + public KinesisProducerProperties getProducer() { + return producer; + } + + public void setProducer(KinesisProducerProperties producer) { + this.producer = producer; + } } From e7e60009ad80c3237846a37b06e979b21aaa099e Mon Sep 17 00:00:00 2001 From: matejnedic Date: Sun, 19 Oct 2025 14:35:42 +0200 Subject: [PATCH 07/11] rename --- .../cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java index 2ab8613c1..eb75e2480 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java @@ -58,7 +58,7 @@ public KinesisAsyncClient kinesisAsyncClient(KinesisProperties properties, @ConditionalOnClass(name = { "com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration", "com.amazonaws.services.kinesis.producer.KinesisProducer" }) @Configuration - public static class KinesisKplAndKclConfiguration { + public static class KinesisProducerAutoConfiguration { @ConditionalOnMissingBean @Bean From 3ad0c0eeb460eef2ff2438f4c4380641bdf22e9a Mon Sep 17 00:00:00 2001 From: matejnedic Date: Wed, 22 Oct 2025 19:28:54 +0200 Subject: [PATCH 08/11] init --- .../kinesis/KinesisAutoConfiguration.java | 70 +++------------ .../KinesisProducerAutoConfiguration.java | 89 +++++++++++++++++++ .../kinesis/KinesisProducerProperties.java | 20 ++++- 3 files changed, 119 insertions(+), 60 deletions(-) create mode 100644 spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java index eb75e2480..9b62e3e96 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java @@ -57,66 +57,18 @@ public KinesisAsyncClient kinesisAsyncClient(KinesisProperties properties, @ConditionalOnClass(name = { "com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration", "com.amazonaws.services.kinesis.producer.KinesisProducer" }) - @Configuration public static class KinesisProducerAutoConfiguration { + } - @ConditionalOnMissingBean - @Bean - public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProperties kinesisProperties, - AwsRegionProvider awsRegionProvider, ObjectProvider connectionDetails) { - PropertyMapper propertyMapper = PropertyMapper.get(); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - KinesisProducerProperties prop = kinesisProperties.getProducer(); - propertyMapper.from(prop::getAggregationEnabled).whenNonNull().to(config::setAggregationEnabled); - propertyMapper.from(prop::getAggregationMaxCount).whenNonNull().to(config::setAggregationMaxCount); - propertyMapper.from(prop::getAggregationMaxSize).whenNonNull().to(config::setAggregationMaxSize); - propertyMapper.from(prop::getCloudwatchEndpoint).whenHasText().to(config::setCloudwatchEndpoint); - propertyMapper.from(prop::getCloudwatchPort).whenNonNull().to(config::setCloudwatchPort); - propertyMapper.from(prop::getCollectionMaxCount).whenNonNull().to(config::setCollectionMaxCount); - propertyMapper.from(prop::getCollectionMaxSize).whenNonNull().to(config::setCollectionMaxSize); - propertyMapper.from(prop::getConnectTimeout).whenNonNull().to(config::setConnectTimeout); - propertyMapper.from(prop::getCredentialsRefreshDelay).whenNonNull().to(config::setCredentialsRefreshDelay); - propertyMapper.from(prop::getEnableCoreDumps).whenNonNull().to(config::setEnableCoreDumps); - propertyMapper.from(prop::getFailIfThrottled).whenNonNull().to(config::setFailIfThrottled); - propertyMapper.from(prop::getLogLevel).whenHasText().to(config::setLogLevel); - propertyMapper.from(prop::getMaxConnections).whenNonNull().to(config::setMaxConnections); - propertyMapper.from(prop::getMetricsGranularity).whenHasText().to(config::setMetricsGranularity); - propertyMapper.from(prop::getMetricsLevel).whenHasText().to(config::setMetricsLevel); - propertyMapper.from(prop::getMetricsNamespace).whenHasText().to(config::setMetricsNamespace); - propertyMapper.from(prop::getMetricsUploadDelay).whenNonNull().to(config::setMetricsUploadDelay); - propertyMapper.from(prop::getMinConnections).whenNonNull().to(config::setMinConnections); - propertyMapper.from(prop::getNativeExecutable).whenNonNull().to(config::setNativeExecutable); - propertyMapper.from(prop::getRateLimit).whenNonNull().to(config::setRateLimit); - propertyMapper.from(prop::getRecordMaxBufferedTime).whenNonNull().to(config::setRecordMaxBufferedTime); - propertyMapper.from(prop::getRecordTtl).whenNonNull().to(config::setRecordTtl); - propertyMapper.from(prop::getRequestTimeout).whenNonNull().to(config::setRequestTimeout); - propertyMapper.from(prop::getTempDirectory).whenNonNull().to(config::setTempDirectory); - propertyMapper.from(prop::getVerifyCertificate).whenNonNull().to(config::setVerifyCertificate); - propertyMapper.from(prop.getProxyHost()).whenNonNull().to(config::setProxyHost); - propertyMapper.from(prop.getProxyPort()).whenNonNull().to(config::setProxyPort); - propertyMapper.from(prop.getProxyUserName()).whenHasText().to(config::setProxyUserName); - propertyMapper.from(prop.getProxyPassword()).whenHasText().to(config::setProxyPassword); - propertyMapper.from(prop.getStsEndpoint()).whenHasText().to(config::setStsEndpoint); - propertyMapper.from(prop.getStsPort()).whenNonNull().to(config::setStsPort); - propertyMapper.from(prop.getThreadingModel()).whenNonNull().to(config::setThreadingModel); - propertyMapper.from(prop.getThreadPoolSize()).whenNonNull().to(config::setThreadPoolSize); - propertyMapper.from(prop.getUserRecordTimeoutInMillis()).whenNonNull() - .to(config::setUserRecordTimeoutInMillis); - - config.setRegion(AwsClientBuilderConfigurer - .resolveRegion(kinesisProperties, connectionDetails.getIfAvailable(), awsRegionProvider) - .toString()); - connectionDetails.ifAvailable(cd -> { - config.setKinesisPort(cd.getEndpoint().getPort()); - config.setKinesisEndpoint(cd.getEndpoint().getHost()); - }); - return config; - } + // In your configs + ConfigsBuilder configsBuilder = new ConfigsBuilder( + streamName, + applicationName, + KinesisClientUtil.createKinesisAsyncClient(kinesisAsyncClient), // <-- Use your bean + dynamoDbClient, + cloudWatchClient, + workerId, + recordProcessorFactory + ); - @ConditionalOnMissingBean - @Bean - public KinesisProducer kinesisProducer(KinesisProducerConfiguration kinesisProducerConfiguration) { - return new KinesisProducer(kinesisProducerConfiguration); - } - } } diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java new file mode 100644 index 000000000..cf5b9f088 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java @@ -0,0 +1,89 @@ +package io.awspring.cloud.autoconfigure.kinesis; + + +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer; +import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.context.annotation.Bean; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.providers.AwsRegionProvider; + +@AutoConfiguration +@ConditionalOnClass({ KinesisProducer.class }) +@EnableConfigurationProperties({ KinesisProducerProperties.class }) +@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class }) +@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.producer.enabled", havingValue = "true", matchIfMissing = true) +public class KinesisProducerAutoConfiguration { + + @ConditionalOnMissingBean + @Bean + public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProperties kinesisProperties, + AwsCredentialsProvider credentialsProvider, + AwsRegionProvider awsRegionProvider, ObjectProvider connectionDetails) { + PropertyMapper propertyMapper = PropertyMapper.get(); + KinesisProducerConfiguration config = new KinesisProducerConfiguration(); + KinesisProducerProperties prop = kinesisProperties.getProducer(); + propertyMapper.from(prop::getAggregationEnabled).whenNonNull().to(config::setAggregationEnabled); + propertyMapper.from(prop::getAggregationMaxCount).whenNonNull().to(config::setAggregationMaxCount); + propertyMapper.from(prop::getAggregationMaxSize).whenNonNull().to(config::setAggregationMaxSize); + propertyMapper.from(prop::getCloudwatchEndpoint).whenHasText().to(config::setCloudwatchEndpoint); + propertyMapper.from(prop::getCloudwatchPort).whenNonNull().to(config::setCloudwatchPort); + propertyMapper.from(prop::getCollectionMaxCount).whenNonNull().to(config::setCollectionMaxCount); + propertyMapper.from(prop::getCollectionMaxSize).whenNonNull().to(config::setCollectionMaxSize); + propertyMapper.from(prop::getConnectTimeout).whenNonNull().to(config::setConnectTimeout); + propertyMapper.from(prop::getCredentialsRefreshDelay).whenNonNull().to(config::setCredentialsRefreshDelay); + propertyMapper.from(prop::getEnableCoreDumps).whenNonNull().to(config::setEnableCoreDumps); + propertyMapper.from(prop::getFailIfThrottled).whenNonNull().to(config::setFailIfThrottled); + propertyMapper.from(prop::getLogLevel).whenHasText().to(config::setLogLevel); + propertyMapper.from(prop::getMaxConnections).whenNonNull().to(config::setMaxConnections); + propertyMapper.from(prop::getMetricsGranularity).whenHasText().to(config::setMetricsGranularity); + propertyMapper.from(prop::getMetricsLevel).whenHasText().to(config::setMetricsLevel); + propertyMapper.from(prop::getMetricsNamespace).whenHasText().to(config::setMetricsNamespace); + propertyMapper.from(prop::getMetricsUploadDelay).whenNonNull().to(config::setMetricsUploadDelay); + propertyMapper.from(prop::getMinConnections).whenNonNull().to(config::setMinConnections); + propertyMapper.from(prop::getNativeExecutable).whenNonNull().to(config::setNativeExecutable); + propertyMapper.from(prop::getRateLimit).whenNonNull().to(config::setRateLimit); + propertyMapper.from(prop::getRecordMaxBufferedTime).whenNonNull().to(config::setRecordMaxBufferedTime); + propertyMapper.from(prop::getRecordTtl).whenNonNull().to(config::setRecordTtl); + propertyMapper.from(prop::getRequestTimeout).whenNonNull().to(config::setRequestTimeout); + propertyMapper.from(prop::getTempDirectory).whenNonNull().to(config::setTempDirectory); + propertyMapper.from(prop::getVerifyCertificate).whenNonNull().to(config::setVerifyCertificate); + propertyMapper.from(prop.getProxyHost()).whenNonNull().to(config::setProxyHost); + propertyMapper.from(prop.getProxyPort()).whenNonNull().to(config::setProxyPort); + propertyMapper.from(prop.getProxyUserName()).whenHasText().to(config::setProxyUserName); + propertyMapper.from(prop.getProxyPassword()).whenHasText().to(config::setProxyPassword); + propertyMapper.from(prop.getStsEndpoint()).whenHasText().to(config::setStsEndpoint); + propertyMapper.from(prop.getStsPort()).whenNonNull().to(config::setStsPort); + propertyMapper.from(prop.getThreadingModel()).whenNonNull().to(config::setThreadingModel); + propertyMapper.from(prop.getThreadPoolSize()).whenNonNull().to(config::setThreadPoolSize); + propertyMapper.from(prop.getUserRecordTimeoutInMillis()).whenNonNull() + .to(config::setUserRecordTimeoutInMillis); + + config.setCredentialsProvider() + config.setRegion(AwsClientBuilderConfigurer + .resolveRegion(kinesisProperties, connectionDetails.getIfAvailable(), awsRegionProvider) + .toString()); + connectionDetails.ifAvailable(cd -> { + config.setKinesisPort(cd.getEndpoint().getPort()); + config.setKinesisEndpoint(cd.getEndpoint().getHost()); + }); + return config; + } + + @ConditionalOnMissingBean + @Bean + public KinesisProducer kinesisProducer(KinesisProducerConfiguration kinesisProducerConfiguration) { + return new KinesisProducer(kinesisProducerConfiguration); + } +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java index 646760854..4a15c224e 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java @@ -16,8 +16,26 @@ package io.awspring.cloud.autoconfigure.kinesis; import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import io.awspring.cloud.autoconfigure.AwsClientProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; + +import static io.awspring.cloud.autoconfigure.kinesis.KinesisProperties.PREFIX; + + +/** + * Properties related to KinesisProducer + * + * @author Matej Nedic + * @since 4.0.0 + */ +@ConfigurationProperties(prefix = PREFIX) +public class KinesisProducerProperties extends AwsClientProperties { + + /** + * The prefix used for AWS Kinesis configuration. + */ + public static final String PREFIX = "spring.cloud.aws.kinesis.producer"; -public class KinesisProducerProperties { /** * Whether aggregation of user records is enabled. */ From 83994beaf82c0efaa59ffaa0dc637c6d218b8fb9 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Wed, 22 Oct 2025 19:44:52 +0200 Subject: [PATCH 09/11] init --- pom.xml | 2 +- spring-cloud-aws-autoconfigure/pom.xml | 2 +- spring-cloud-aws-dependencies/pom.xml | 18 ++---------------- .../spring-cloud-aws-starter-kinesis/pom.xml | 2 +- 4 files changed, 5 insertions(+), 19 deletions(-) diff --git a/pom.xml b/pom.xml index 781d7b365..16827e90f 100644 --- a/pom.xml +++ b/pom.xml @@ -45,11 +45,11 @@ spring-cloud-aws-sqs spring-cloud-aws-dynamodb spring-cloud-aws-kinesis - spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis spring-cloud-aws-s3 spring-cloud-aws-testcontainers spring-cloud-aws-starters/spring-cloud-aws-starter spring-cloud-aws-starters/spring-cloud-aws-starter-dynamodb + spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis spring-cloud-aws-starters/spring-cloud-aws-starter-integration-dynamodb spring-cloud-aws-starters/spring-cloud-aws-starter-metrics spring-cloud-aws-starters/spring-cloud-aws-starter-parameter-store diff --git a/spring-cloud-aws-autoconfigure/pom.xml b/spring-cloud-aws-autoconfigure/pom.xml index 60b8ba319..c12004be6 100644 --- a/spring-cloud-aws-autoconfigure/pom.xml +++ b/spring-cloud-aws-autoconfigure/pom.xml @@ -187,7 +187,7 @@ true - com.amazonaws + software.amazon.kinesis amazon-kinesis-producer true diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml index ac6361a81..f1e113d22 100644 --- a/spring-cloud-aws-dependencies/pom.xml +++ b/spring-cloud-aws-dependencies/pom.xml @@ -35,8 +35,6 @@ 2.0.3 2.0.0-M2 2.3.0 - 2.7.0 - 0.15.12 @@ -72,13 +70,6 @@ true - - software.amazon.kinesis - amazon-kinesis-client - ${amazon.kinesis.client} - true - - software.amazon.encryption.s3 amazon-s3-encryption-client-java @@ -86,13 +77,6 @@ true - - com.amazonaws - amazon-kinesis-producer - ${amazon.kinesis.producer} - true - - software.amazon.s3.accessgrants aws-s3-accessgrants-java-plugin @@ -281,11 +265,13 @@ software.amazon.kinesis amazon-kinesis-client ${kcl.version} + true software.amazon.kinesis amazon-kinesis-producer ${kpl.version} + true diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml index ac9bd3a8e..6cccc82c2 100644 --- a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml @@ -30,7 +30,7 @@ amazon-kinesis-client - com.amazonaws + software.amazon.kinesis amazon-kinesis-producer From 9b2758870ae385e1bda2b4ff373c8bed41b1a440 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Thu, 23 Oct 2025 18:33:26 +0200 Subject: [PATCH 10/11] Not yet done --- pom.xml | 2 + spring-cloud-aws-autoconfigure/pom.xml | 10 ++--- .../kinesis/KinesisAutoConfiguration.java | 22 ----------- ...KinesisClientLibraryAutoConfiguration.java | 38 +++++++++++++++++++ .../KinesisClientLibraryProperties.java | 31 +++++++++++++++ .../KinesisProducerAutoConfiguration.java | 13 +++---- .../kinesis/KinesisProducerProperties.java | 6 +-- .../kinesis/KinesisProperties.java | 11 ------ ...ot.autoconfigure.AutoConfiguration.imports | 2 + .../AbstractPollingMessageSourceTests.java | 9 ++--- .../pom.xml | 33 ++++++++++++++++ .../pom.xml | 29 ++++++++++++++ .../spring-cloud-aws-starter-kinesis/pom.xml | 8 ---- 13 files changed, 152 insertions(+), 62 deletions(-) create mode 100644 spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java create mode 100644 spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java create mode 100644 spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library/pom.xml create mode 100644 spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer/pom.xml diff --git a/pom.xml b/pom.xml index 16827e90f..ab0213e11 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,8 @@ spring-cloud-aws-starters/spring-cloud-aws-starter-sqs spring-cloud-aws-starters/spring-cloud-aws-starter-integration-sqs spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis + spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer + spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library spring-cloud-aws-samples spring-cloud-aws-test spring-cloud-aws-modulith diff --git a/spring-cloud-aws-autoconfigure/pom.xml b/spring-cloud-aws-autoconfigure/pom.xml index c12004be6..1e50b2692 100644 --- a/spring-cloud-aws-autoconfigure/pom.xml +++ b/spring-cloud-aws-autoconfigure/pom.xml @@ -182,18 +182,18 @@ true - software.amazon.kinesis - amazon-kinesis-client + software.amazon.awssdk + kinesis true software.amazon.kinesis - amazon-kinesis-producer + amazon-kinesis-client true - software.amazon.awssdk - kinesis + software.amazon.kinesis + amazon-kinesis-producer true diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java index 9b62e3e96..2a221a3b3 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java @@ -15,8 +15,6 @@ */ package io.awspring.cloud.autoconfigure.kinesis; -import com.amazonaws.services.kinesis.producer.KinesisProducer; -import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; import io.awspring.cloud.autoconfigure.AwsAsyncClientCustomizer; import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer; import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; @@ -29,10 +27,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import software.amazon.awssdk.regions.providers.AwsRegionProvider; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @AutoConfiguration @@ -54,21 +49,4 @@ public KinesisAsyncClient kinesisAsyncClient(KinesisProperties properties, kinesisAsyncClientCustomizer.orderedStream(), awsSyncClientCustomizers.orderedStream()) .build(); } - - @ConditionalOnClass(name = { "com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration", - "com.amazonaws.services.kinesis.producer.KinesisProducer" }) - public static class KinesisProducerAutoConfiguration { - } - - // In your configs - ConfigsBuilder configsBuilder = new ConfigsBuilder( - streamName, - applicationName, - KinesisClientUtil.createKinesisAsyncClient(kinesisAsyncClient), // <-- Use your bean - dynamoDbClient, - cloudWatchClient, - workerId, - recordProcessorFactory - ); - } diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java new file mode 100644 index 000000000..1c59098c0 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java @@ -0,0 +1,38 @@ +package io.awspring.cloud.autoconfigure.kinesis; + + +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.coordinator.Scheduler; +import software.amazon.kinesis.processor.ShardRecordProcessorFactory; + +import java.util.UUID; + +@AutoConfiguration +@ConditionalOnClass({ KinesisAsyncClient.class, Scheduler.class }) +@EnableConfigurationProperties({ KinesisClientLibraryProperties.class }) +@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class, KinesisAutoConfiguration.class }) +@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.client.library.enabled", havingValue = "true", matchIfMissing = true) +public class KinesisClientLibraryAutoConfiguration { + + + + @ConditionalOnMissingBean + @Bean + public Scheduler scheduler(ObjectProvider dynamoDbClient, ObjectProvider cloudWatchClient, + KinesisAsyncClient kinesisAsyncClient, KinesisClientLibraryProperties properties, + ShardRecordProcessorFactory processorFactory) { + return null; + } +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java new file mode 100644 index 000000000..5dfade220 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java @@ -0,0 +1,31 @@ +package io.awspring.cloud.autoconfigure.kinesis; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import static io.awspring.cloud.autoconfigure.kinesis.KinesisClientLibraryProperties.PREFIX; + + +@ConfigurationProperties(prefix = PREFIX) +public class KinesisClientLibraryProperties { + + public static final String PREFIX = "spring.cloud.aws.kinesis.client.library"; + + private String streamName; + private String applicationName; + + public String getStreamName() { + return streamName; + } + + public void setStreamName(String streamName) { + this.streamName = streamName; + } + + public String getApplicationName() { + return applicationName; + } + + public void setApplicationName(String applicationName) { + this.applicationName = applicationName; + } +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java index cf5b9f088..749f1ba1b 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java @@ -1,8 +1,8 @@ package io.awspring.cloud.autoconfigure.kinesis; -import com.amazonaws.services.kinesis.producer.KinesisProducer; -import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import software.amazon.kinesis.producer.KinesisProducer; +import software.amazon.kinesis.producer.KinesisProducerConfiguration; import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer; import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; @@ -20,7 +20,7 @@ import software.amazon.awssdk.regions.providers.AwsRegionProvider; @AutoConfiguration -@ConditionalOnClass({ KinesisProducer.class }) +@ConditionalOnClass({ KinesisProducer.class, KinesisProducerConfiguration.class }) @EnableConfigurationProperties({ KinesisProducerProperties.class }) @AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class }) @ConditionalOnProperty(value = "spring.cloud.aws.kinesis.producer.enabled", havingValue = "true", matchIfMissing = true) @@ -28,12 +28,11 @@ public class KinesisProducerAutoConfiguration { @ConditionalOnMissingBean @Bean - public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProperties kinesisProperties, + public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProducerProperties prop, AwsCredentialsProvider credentialsProvider, AwsRegionProvider awsRegionProvider, ObjectProvider connectionDetails) { PropertyMapper propertyMapper = PropertyMapper.get(); KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - KinesisProducerProperties prop = kinesisProperties.getProducer(); propertyMapper.from(prop::getAggregationEnabled).whenNonNull().to(config::setAggregationEnabled); propertyMapper.from(prop::getAggregationMaxCount).whenNonNull().to(config::setAggregationMaxCount); propertyMapper.from(prop::getAggregationMaxSize).whenNonNull().to(config::setAggregationMaxSize); @@ -70,9 +69,9 @@ public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProperti propertyMapper.from(prop.getUserRecordTimeoutInMillis()).whenNonNull() .to(config::setUserRecordTimeoutInMillis); - config.setCredentialsProvider() + config.setCredentialsProvider(credentialsProvider); config.setRegion(AwsClientBuilderConfigurer - .resolveRegion(kinesisProperties, connectionDetails.getIfAvailable(), awsRegionProvider) + .resolveRegion(prop, connectionDetails.getIfAvailable(), awsRegionProvider) .toString()); connectionDetails.ifAvailable(cd -> { config.setKinesisPort(cd.getEndpoint().getPort()); diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java index 4a15c224e..2a47f0b87 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java @@ -15,12 +15,12 @@ */ package io.awspring.cloud.autoconfigure.kinesis; -import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; + import io.awspring.cloud.autoconfigure.AwsClientProperties; import org.springframework.boot.context.properties.ConfigurationProperties; +import software.amazon.kinesis.producer.KinesisProducerConfiguration; -import static io.awspring.cloud.autoconfigure.kinesis.KinesisProperties.PREFIX; - +import static io.awspring.cloud.autoconfigure.kinesis.KinesisProducerProperties.PREFIX; /** * Properties related to KinesisProducer diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java index 73c5cc163..ed289bd89 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java @@ -33,15 +33,4 @@ public class KinesisProperties extends AwsClientProperties { * The prefix used for AWS Kinesis configuration. */ public static final String PREFIX = "spring.cloud.aws.kinesis"; - - @Nullable - private KinesisProducerProperties producer = new KinesisProducerProperties(); - - public KinesisProducerProperties getProducer() { - return producer; - } - - public void setProducer(KinesisProducerProperties producer) { - this.producer = producer; - } } diff --git a/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 882c58028..11ac764f7 100644 --- a/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -17,3 +17,5 @@ io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreReloadAutoCo io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreAutoConfiguration io.awspring.cloud.autoconfigure.config.s3.S3ReloadAutoConfiguration io.awspring.cloud.autoconfigure.kinesis.KinesisAutoConfiguration +io.awspring.cloud.autoconfigure.kinesis.KinesisProducerAutoConfiguration +io.awspring.cloud.autoconfigure.kinesis.KinesisClientLibraryAutoConfiguration diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java index fb240cffa..a3f850ede 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java @@ -24,8 +24,6 @@ import io.awspring.cloud.sqs.MessageExecutionThreadFactory; import io.awspring.cloud.sqs.listener.BackPressureMode; -import io.awspring.cloud.sqs.listener.ContainerOptions; -import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder; import io.awspring.cloud.sqs.listener.SqsContainerOptions; import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementCallback; import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor; @@ -442,10 +440,9 @@ else if (currentPoll.compareAndSet(2, 3)) { void shouldRemovePollingFutureOnException() throws InterruptedException { String testName = "shouldClearPollingFuturesOnException"; - BackPressureHandler backPressureHandler = BackPressureHandlerFactories - .adaptiveThroughputBackPressureHandler() - .createBackPressureHandler(SqsContainerOptions.builder() - .maxDelayBetweenPolls(Duration.ofMillis(100)).backPressureMode(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES).build()); + BackPressureHandler backPressureHandler = BackPressureHandlerFactories.adaptiveThroughputBackPressureHandler() + .createBackPressureHandler(SqsContainerOptions.builder().maxDelayBetweenPolls(Duration.ofMillis(100)) + .backPressureMode(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES).build()); AbstractPollingMessageSource source = new AbstractPollingMessageSource<>() { @Override diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library/pom.xml new file mode 100644 index 000000000..ef90ec96b --- /dev/null +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library/pom.xml @@ -0,0 +1,33 @@ + + + + + spring-cloud-aws + io.awspring.cloud + 4.0.0-SNAPSHOT + ../../pom.xml + + + 4.0.0 + + spring-cloud-aws-starter-kinesis-client-library + Spring Cloud AWS Kinesis Client Library + Spring Cloud AWS Kinesis Client Library + + + + io.awspring.cloud + spring-cloud-aws-starter + + + software.amazon.kinesis + amazon-kinesis-client + + + software.amazon.awssdk + kinesis + + + diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer/pom.xml new file mode 100644 index 000000000..92fa24ae8 --- /dev/null +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer/pom.xml @@ -0,0 +1,29 @@ + + + + + spring-cloud-aws + io.awspring.cloud + 4.0.0-SNAPSHOT + ../../pom.xml + + + 4.0.0 + + spring-cloud-aws-starter-kinesis-producer + Spring Cloud AWS Kinesis Producer Starter + Spring Cloud AWS Kinesis Producer Starter + + + + io.awspring.cloud + spring-cloud-aws-starter + + + software.amazon.kinesis + amazon-kinesis-producer + + + diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml index 6cccc82c2..a9f02f837 100644 --- a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml @@ -25,13 +25,5 @@ software.amazon.awssdk kinesis - - software.amazon.kinesis - amazon-kinesis-client - - - software.amazon.kinesis - amazon-kinesis-producer - From a01456055ef9365a41b8dbf1bb05c348dc2bb4f9 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Thu, 23 Oct 2025 18:40:37 +0200 Subject: [PATCH 11/11] Apply spottless --- .../kinesis/KinesisAsyncClientCustomizer.java | 2 +- ...KinesisClientLibraryAutoConfiguration.java | 29 ++++++++++++------ .../KinesisClientLibraryProperties.java | 18 +++++++++-- .../KinesisProducerAutoConfiguration.java | 30 +++++++++++++------ .../kinesis/KinesisProducerProperties.java | 3 +- .../kinesis/KinesisProperties.java | 1 - .../integration/KinesisIntegrationTests.java | 6 ++-- 7 files changed, 62 insertions(+), 27 deletions(-) diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java index 2c16d1aa0..60bdb3d74 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2022 the original author or authors. + * Copyright 2013-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java index 1c59098c0..9e690d1c9 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java @@ -1,6 +1,20 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.awspring.cloud.autoconfigure.kinesis; - import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; import org.springframework.beans.factory.ObjectProvider; @@ -17,22 +31,19 @@ import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; -import java.util.UUID; - @AutoConfiguration @ConditionalOnClass({ KinesisAsyncClient.class, Scheduler.class }) @EnableConfigurationProperties({ KinesisClientLibraryProperties.class }) -@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class, KinesisAutoConfiguration.class }) +@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class, + KinesisAutoConfiguration.class }) @ConditionalOnProperty(value = "spring.cloud.aws.kinesis.client.library.enabled", havingValue = "true", matchIfMissing = true) public class KinesisClientLibraryAutoConfiguration { - - @ConditionalOnMissingBean @Bean - public Scheduler scheduler(ObjectProvider dynamoDbClient, ObjectProvider cloudWatchClient, - KinesisAsyncClient kinesisAsyncClient, KinesisClientLibraryProperties properties, - ShardRecordProcessorFactory processorFactory) { + public Scheduler scheduler(ObjectProvider dynamoDbClient, + ObjectProvider cloudWatchClient, KinesisAsyncClient kinesisAsyncClient, + KinesisClientLibraryProperties properties, ShardRecordProcessorFactory processorFactory) { return null; } } diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java index 5dfade220..c31cc6798 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java @@ -1,9 +1,23 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.awspring.cloud.autoconfigure.kinesis; -import org.springframework.boot.context.properties.ConfigurationProperties; - import static io.awspring.cloud.autoconfigure.kinesis.KinesisClientLibraryProperties.PREFIX; +import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = PREFIX) public class KinesisClientLibraryProperties { diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java index 749f1ba1b..5ab585915 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java @@ -1,8 +1,20 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.awspring.cloud.autoconfigure.kinesis; - -import software.amazon.kinesis.producer.KinesisProducer; -import software.amazon.kinesis.producer.KinesisProducerConfiguration; import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer; import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; @@ -18,6 +30,8 @@ import org.springframework.context.annotation.Bean; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.providers.AwsRegionProvider; +import software.amazon.kinesis.producer.KinesisProducer; +import software.amazon.kinesis.producer.KinesisProducerConfiguration; @AutoConfiguration @ConditionalOnClass({ KinesisProducer.class, KinesisProducerConfiguration.class }) @@ -29,8 +43,8 @@ public class KinesisProducerAutoConfiguration { @ConditionalOnMissingBean @Bean public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProducerProperties prop, - AwsCredentialsProvider credentialsProvider, - AwsRegionProvider awsRegionProvider, ObjectProvider connectionDetails) { + AwsCredentialsProvider credentialsProvider, AwsRegionProvider awsRegionProvider, + ObjectProvider connectionDetails) { PropertyMapper propertyMapper = PropertyMapper.get(); KinesisProducerConfiguration config = new KinesisProducerConfiguration(); propertyMapper.from(prop::getAggregationEnabled).whenNonNull().to(config::setAggregationEnabled); @@ -66,13 +80,11 @@ public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProducer propertyMapper.from(prop.getStsPort()).whenNonNull().to(config::setStsPort); propertyMapper.from(prop.getThreadingModel()).whenNonNull().to(config::setThreadingModel); propertyMapper.from(prop.getThreadPoolSize()).whenNonNull().to(config::setThreadPoolSize); - propertyMapper.from(prop.getUserRecordTimeoutInMillis()).whenNonNull() - .to(config::setUserRecordTimeoutInMillis); + propertyMapper.from(prop.getUserRecordTimeoutInMillis()).whenNonNull().to(config::setUserRecordTimeoutInMillis); config.setCredentialsProvider(credentialsProvider); config.setRegion(AwsClientBuilderConfigurer - .resolveRegion(prop, connectionDetails.getIfAvailable(), awsRegionProvider) - .toString()); + .resolveRegion(prop, connectionDetails.getIfAvailable(), awsRegionProvider).toString()); connectionDetails.ifAvailable(cd -> { config.setKinesisPort(cd.getEndpoint().getPort()); config.setKinesisEndpoint(cd.getEndpoint().getHost()); diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java index 2a47f0b87..5d90fde67 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java @@ -15,13 +15,12 @@ */ package io.awspring.cloud.autoconfigure.kinesis; +import static io.awspring.cloud.autoconfigure.kinesis.KinesisProducerProperties.PREFIX; import io.awspring.cloud.autoconfigure.AwsClientProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import software.amazon.kinesis.producer.KinesisProducerConfiguration; -import static io.awspring.cloud.autoconfigure.kinesis.KinesisProducerProperties.PREFIX; - /** * Properties related to KinesisProducer * diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java index ed289bd89..a809a0e18 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java @@ -19,7 +19,6 @@ import io.awspring.cloud.autoconfigure.AwsClientProperties; import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.lang.Nullable; /** * Properties related to KinesisClient diff --git a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java index 599862add..4fa957a7b 100644 --- a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java +++ b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java @@ -23,7 +23,6 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; - import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -112,13 +111,14 @@ void kinesisInboundOutbound() throws InterruptedException { .contains("Channel 'kinesisReceiveChannel' expected one of the following data types " + "[class java.util.Date], but received [class java.lang.String]"); - String errorSequenceNumber = errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class).sequenceNumber(); + String errorSequenceNumber = errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class) + .sequenceNumber(); // Second exception for the same record since we have requested via bubbling exception up to the consumer errorMessage = this.errorChannel.receive(30_000); assertThat(errorMessage).isNotNull(); assertThat(errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class).sequenceNumber()) - .isEqualTo(errorSequenceNumber); + .isEqualTo(errorSequenceNumber); for (int i = 0; i < 2; i++) { this.kinesisSendChannel