diff --git a/pom.xml b/pom.xml
index bb7cb54c4..ab0213e11 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,11 +45,11 @@
spring-cloud-aws-sqs
spring-cloud-aws-dynamodb
spring-cloud-aws-kinesis
- spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis
spring-cloud-aws-s3
spring-cloud-aws-testcontainers
spring-cloud-aws-starters/spring-cloud-aws-starter
spring-cloud-aws-starters/spring-cloud-aws-starter-dynamodb
+ spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis
spring-cloud-aws-starters/spring-cloud-aws-starter-integration-dynamodb
spring-cloud-aws-starters/spring-cloud-aws-starter-metrics
spring-cloud-aws-starters/spring-cloud-aws-starter-parameter-store
@@ -61,6 +61,9 @@
spring-cloud-aws-starters/spring-cloud-aws-starter-integration-sns
spring-cloud-aws-starters/spring-cloud-aws-starter-sqs
spring-cloud-aws-starters/spring-cloud-aws-starter-integration-sqs
+ spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis
+ spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer
+ spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library
spring-cloud-aws-samples
spring-cloud-aws-test
spring-cloud-aws-modulith
diff --git a/spring-cloud-aws-autoconfigure/pom.xml b/spring-cloud-aws-autoconfigure/pom.xml
index 8d7e8e4ab..1e50b2692 100644
--- a/spring-cloud-aws-autoconfigure/pom.xml
+++ b/spring-cloud-aws-autoconfigure/pom.xml
@@ -181,6 +181,21 @@
kms
true
+
+ software.amazon.awssdk
+ kinesis
+ true
+
+
+ software.amazon.kinesis
+ amazon-kinesis-client
+ true
+
+
+ software.amazon.kinesis
+ amazon-kinesis-producer
+ true
+
software.amazon.encryption.s3
amazon-s3-encryption-client-java
diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java
new file mode 100644
index 000000000..60bdb3d74
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import io.awspring.cloud.autoconfigure.AwsClientCustomizer;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+
+/**
+ * Callback interface that can be used to customize a {@link KinesisAsyncClientBuilder}.
+ *
+ * @author Matej Nedic
+ * @since 4.0.0
+ */
+@FunctionalInterface
+public interface KinesisAsyncClientCustomizer extends AwsClientCustomizer {
+}
diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java
new file mode 100644
index 000000000..2a221a3b3
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import io.awspring.cloud.autoconfigure.AwsAsyncClientCustomizer;
+import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer;
+import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails;
+import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
+import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+@AutoConfiguration
+@ConditionalOnClass({ KinesisAsyncClient.class })
+@EnableConfigurationProperties({ KinesisProperties.class })
+@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class })
+@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.enabled", havingValue = "true", matchIfMissing = true)
+public class KinesisAutoConfiguration {
+
+ @ConditionalOnMissingBean
+ @Bean
+ public KinesisAsyncClient kinesisAsyncClient(KinesisProperties properties,
+ AwsClientBuilderConfigurer awsClientBuilderConfigurer,
+ ObjectProvider connectionDetails,
+ ObjectProvider kinesisAsyncClientCustomizer,
+ ObjectProvider awsSyncClientCustomizers) {
+ return awsClientBuilderConfigurer
+ .configureAsyncClient(KinesisAsyncClient.builder(), properties, connectionDetails.getIfAvailable(),
+ kinesisAsyncClientCustomizer.orderedStream(), awsSyncClientCustomizers.orderedStream())
+ .build();
+ }
+}
diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java
new file mode 100644
index 000000000..9e690d1c9
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
+import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.kinesis.coordinator.Scheduler;
+import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
+
+@AutoConfiguration
+@ConditionalOnClass({ KinesisAsyncClient.class, Scheduler.class })
+@EnableConfigurationProperties({ KinesisClientLibraryProperties.class })
+@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class,
+ KinesisAutoConfiguration.class })
+@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.client.library.enabled", havingValue = "true", matchIfMissing = true)
+public class KinesisClientLibraryAutoConfiguration {
+
+ @ConditionalOnMissingBean
+ @Bean
+ public Scheduler scheduler(ObjectProvider dynamoDbClient,
+ ObjectProvider cloudWatchClient, KinesisAsyncClient kinesisAsyncClient,
+ KinesisClientLibraryProperties properties, ShardRecordProcessorFactory processorFactory) {
+ return null;
+ }
+}
diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java
new file mode 100644
index 000000000..c31cc6798
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import static io.awspring.cloud.autoconfigure.kinesis.KinesisClientLibraryProperties.PREFIX;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties(prefix = PREFIX)
+public class KinesisClientLibraryProperties {
+
+ public static final String PREFIX = "spring.cloud.aws.kinesis.client.library";
+
+ private String streamName;
+ private String applicationName;
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public void setStreamName(String streamName) {
+ this.streamName = streamName;
+ }
+
+ public String getApplicationName() {
+ return applicationName;
+ }
+
+ public void setApplicationName(String applicationName) {
+ this.applicationName = applicationName;
+ }
+}
diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java
new file mode 100644
index 000000000..5ab585915
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer;
+import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails;
+import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
+import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.context.properties.PropertyMapper;
+import org.springframework.context.annotation.Bean;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.providers.AwsRegionProvider;
+import software.amazon.kinesis.producer.KinesisProducer;
+import software.amazon.kinesis.producer.KinesisProducerConfiguration;
+
+@AutoConfiguration
+@ConditionalOnClass({ KinesisProducer.class, KinesisProducerConfiguration.class })
+@EnableConfigurationProperties({ KinesisProducerProperties.class })
+@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class })
+@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.producer.enabled", havingValue = "true", matchIfMissing = true)
+public class KinesisProducerAutoConfiguration {
+
+ @ConditionalOnMissingBean
+ @Bean
+ public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProducerProperties prop,
+ AwsCredentialsProvider credentialsProvider, AwsRegionProvider awsRegionProvider,
+ ObjectProvider connectionDetails) {
+ PropertyMapper propertyMapper = PropertyMapper.get();
+ KinesisProducerConfiguration config = new KinesisProducerConfiguration();
+ propertyMapper.from(prop::getAggregationEnabled).whenNonNull().to(config::setAggregationEnabled);
+ propertyMapper.from(prop::getAggregationMaxCount).whenNonNull().to(config::setAggregationMaxCount);
+ propertyMapper.from(prop::getAggregationMaxSize).whenNonNull().to(config::setAggregationMaxSize);
+ propertyMapper.from(prop::getCloudwatchEndpoint).whenHasText().to(config::setCloudwatchEndpoint);
+ propertyMapper.from(prop::getCloudwatchPort).whenNonNull().to(config::setCloudwatchPort);
+ propertyMapper.from(prop::getCollectionMaxCount).whenNonNull().to(config::setCollectionMaxCount);
+ propertyMapper.from(prop::getCollectionMaxSize).whenNonNull().to(config::setCollectionMaxSize);
+ propertyMapper.from(prop::getConnectTimeout).whenNonNull().to(config::setConnectTimeout);
+ propertyMapper.from(prop::getCredentialsRefreshDelay).whenNonNull().to(config::setCredentialsRefreshDelay);
+ propertyMapper.from(prop::getEnableCoreDumps).whenNonNull().to(config::setEnableCoreDumps);
+ propertyMapper.from(prop::getFailIfThrottled).whenNonNull().to(config::setFailIfThrottled);
+ propertyMapper.from(prop::getLogLevel).whenHasText().to(config::setLogLevel);
+ propertyMapper.from(prop::getMaxConnections).whenNonNull().to(config::setMaxConnections);
+ propertyMapper.from(prop::getMetricsGranularity).whenHasText().to(config::setMetricsGranularity);
+ propertyMapper.from(prop::getMetricsLevel).whenHasText().to(config::setMetricsLevel);
+ propertyMapper.from(prop::getMetricsNamespace).whenHasText().to(config::setMetricsNamespace);
+ propertyMapper.from(prop::getMetricsUploadDelay).whenNonNull().to(config::setMetricsUploadDelay);
+ propertyMapper.from(prop::getMinConnections).whenNonNull().to(config::setMinConnections);
+ propertyMapper.from(prop::getNativeExecutable).whenNonNull().to(config::setNativeExecutable);
+ propertyMapper.from(prop::getRateLimit).whenNonNull().to(config::setRateLimit);
+ propertyMapper.from(prop::getRecordMaxBufferedTime).whenNonNull().to(config::setRecordMaxBufferedTime);
+ propertyMapper.from(prop::getRecordTtl).whenNonNull().to(config::setRecordTtl);
+ propertyMapper.from(prop::getRequestTimeout).whenNonNull().to(config::setRequestTimeout);
+ propertyMapper.from(prop::getTempDirectory).whenNonNull().to(config::setTempDirectory);
+ propertyMapper.from(prop::getVerifyCertificate).whenNonNull().to(config::setVerifyCertificate);
+ propertyMapper.from(prop.getProxyHost()).whenNonNull().to(config::setProxyHost);
+ propertyMapper.from(prop.getProxyPort()).whenNonNull().to(config::setProxyPort);
+ propertyMapper.from(prop.getProxyUserName()).whenHasText().to(config::setProxyUserName);
+ propertyMapper.from(prop.getProxyPassword()).whenHasText().to(config::setProxyPassword);
+ propertyMapper.from(prop.getStsEndpoint()).whenHasText().to(config::setStsEndpoint);
+ propertyMapper.from(prop.getStsPort()).whenNonNull().to(config::setStsPort);
+ propertyMapper.from(prop.getThreadingModel()).whenNonNull().to(config::setThreadingModel);
+ propertyMapper.from(prop.getThreadPoolSize()).whenNonNull().to(config::setThreadPoolSize);
+ propertyMapper.from(prop.getUserRecordTimeoutInMillis()).whenNonNull().to(config::setUserRecordTimeoutInMillis);
+
+ config.setCredentialsProvider(credentialsProvider);
+ config.setRegion(AwsClientBuilderConfigurer
+ .resolveRegion(prop, connectionDetails.getIfAvailable(), awsRegionProvider).toString());
+ connectionDetails.ifAvailable(cd -> {
+ config.setKinesisPort(cd.getEndpoint().getPort());
+ config.setKinesisEndpoint(cd.getEndpoint().getHost());
+ });
+ return config;
+ }
+
+ @ConditionalOnMissingBean
+ @Bean
+ public KinesisProducer kinesisProducer(KinesisProducerConfiguration kinesisProducerConfiguration) {
+ return new KinesisProducer(kinesisProducerConfiguration);
+ }
+}
diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java
new file mode 100644
index 000000000..5d90fde67
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java
@@ -0,0 +1,479 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import static io.awspring.cloud.autoconfigure.kinesis.KinesisProducerProperties.PREFIX;
+
+import io.awspring.cloud.autoconfigure.AwsClientProperties;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import software.amazon.kinesis.producer.KinesisProducerConfiguration;
+
+/**
+ * Properties related to KinesisProducer
+ *
+ * @author Matej Nedic
+ * @since 4.0.0
+ */
+@ConfigurationProperties(prefix = PREFIX)
+public class KinesisProducerProperties extends AwsClientProperties {
+
+ /**
+ * The prefix used for AWS Kinesis configuration.
+ */
+ public static final String PREFIX = "spring.cloud.aws.kinesis.producer";
+
+ /**
+ * Whether aggregation of user records is enabled.
+ */
+ private Boolean aggregationEnabled;
+
+ /**
+ * Maximum number of user records to aggregate. Must be between 1 and Long.MAX_VALUE.
+ */
+ private Long aggregationMaxCount;
+
+ /**
+ * Maximum size in bytes of aggregated records. Must be between 64 and 1048576.
+ */
+ private Long aggregationMaxSize;
+
+ /**
+ * CloudWatch endpoint. Must match the regex: ^([A-Za-z0-9-\\.]+)?$
+ */
+ private String cloudwatchEndpoint;
+
+ /**
+ * CloudWatch port. Must be between 1 and 65535.
+ */
+ private Long cloudwatchPort;
+
+ /**
+ * Maximum number of records to collect before sending a batch. Must be between 1 and 500.
+ */
+ private Long collectionMaxCount;
+
+ /**
+ * Maximum size in bytes of collected records before sending. Must be between 52224 and Long.MAX_VALUE.
+ */
+ private Long collectionMaxSize;
+
+ /**
+ * Connection timeout in milliseconds. Must be between 100 and 300000.
+ */
+ private Long connectTimeout;
+
+ /**
+ * Delay in milliseconds for credentials refresh. Must be between 1 and 300000.
+ */
+ private Long credentialsRefreshDelay;
+
+ /**
+ * Whether core dumps are enabled.
+ */
+ private Boolean enableCoreDumps;
+
+ /**
+ * Whether to fail if throttled by Kinesis.
+ */
+ private Boolean failIfThrottled;
+
+ /**
+ * Log level for the producer. Allowed values: trace, debug, info, warning, error.
+ */
+ private String logLevel;
+
+ /**
+ * Maximum number of connections. Must be between 1 and 256.
+ */
+ private Long maxConnections;
+
+ /**
+ * Metrics granularity. Allowed values: global, stream, shard.
+ */
+ private String metricsGranularity;
+
+ /**
+ * Metrics level. Allowed values: none, summary, detailed.
+ */
+ private String metricsLevel;
+
+ /**
+ * Metrics namespace. Must match the regex: (?!AWS/).{1,255}
+ */
+ private String metricsNamespace;
+
+ /**
+ * Delay in milliseconds for uploading metrics. Must be between 1 and 60000.
+ */
+ private Long metricsUploadDelay;
+
+ /**
+ * Minimum number of connections. Must be between 1 and 16.
+ */
+ private Long minConnections;
+
+ /**
+ * Native executable path.
+ */
+ private String nativeExecutable;
+
+ /**
+ * Rate limit for records per second. Must be between 1 and Long.MAX_VALUE.
+ */
+ private Long rateLimit;
+
+ /**
+ * Maximum buffered time for records in milliseconds. Must be between 0 and Long.MAX_VALUE.
+ */
+ private Long recordMaxBufferedTime;
+
+ /**
+ * Time to live for records in milliseconds. Must be between 100 and Long.MAX_VALUE.
+ */
+ private Long recordTtl;
+
+ /**
+ * Request timeout in milliseconds. Must be between 100 and 600000.
+ */
+ private Long requestTimeout;
+
+ /**
+ * Temporary directory path for the producer.
+ */
+ private String tempDirectory;
+
+ /**
+ * Whether to verify SSL certificates.
+ */
+ private Boolean verifyCertificate;
+
+ /**
+ * Proxy host.
+ */
+ private String proxyHost;
+
+ /**
+ * Proxy port. Must be between 1 and 65535.
+ */
+ private Long proxyPort;
+
+ /**
+ * Proxy username.
+ */
+ private String proxyUserName;
+
+ /**
+ * Proxy password.
+ */
+ private String proxyPassword;
+
+ /**
+ * STS endpoint. Must match the regex: ^([A-Za-z0-9-\\.]+)?$
+ */
+ private String stsEndpoint;
+
+ /**
+ * STS port. Must be between 1 and 65535.
+ */
+ private Long stsPort;
+
+ /**
+ * Threading model for the producer.
+ */
+ private KinesisProducerConfiguration.ThreadingModel threadingModel;
+
+ /**
+ * Thread pool size. Must be greater than or equal to 0.
+ */
+ private Integer threadPoolSize;
+
+ /**
+ * Timeout in milliseconds for user records. Must be greater than or equal to 0.
+ */
+ private Long userRecordTimeoutInMillis;
+
+ public Boolean getAggregationEnabled() {
+ return aggregationEnabled;
+ }
+
+ public void setAggregationEnabled(Boolean aggregationEnabled) {
+ this.aggregationEnabled = aggregationEnabled;
+ }
+
+ public Long getAggregationMaxCount() {
+ return aggregationMaxCount;
+ }
+
+ public void setAggregationMaxCount(Long aggregationMaxCount) {
+ this.aggregationMaxCount = aggregationMaxCount;
+ }
+
+ public Long getAggregationMaxSize() {
+ return aggregationMaxSize;
+ }
+
+ public void setAggregationMaxSize(Long aggregationMaxSize) {
+ this.aggregationMaxSize = aggregationMaxSize;
+ }
+
+ public String getCloudwatchEndpoint() {
+ return cloudwatchEndpoint;
+ }
+
+ public void setCloudwatchEndpoint(String cloudwatchEndpoint) {
+ this.cloudwatchEndpoint = cloudwatchEndpoint;
+ }
+
+ public Long getCloudwatchPort() {
+ return cloudwatchPort;
+ }
+
+ public void setCloudwatchPort(Long cloudwatchPort) {
+ this.cloudwatchPort = cloudwatchPort;
+ }
+
+ public Long getCollectionMaxCount() {
+ return collectionMaxCount;
+ }
+
+ public void setCollectionMaxCount(Long collectionMaxCount) {
+ this.collectionMaxCount = collectionMaxCount;
+ }
+
+ public Long getCollectionMaxSize() {
+ return collectionMaxSize;
+ }
+
+ public void setCollectionMaxSize(Long collectionMaxSize) {
+ this.collectionMaxSize = collectionMaxSize;
+ }
+
+ public Long getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public void setConnectTimeout(Long connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+
+ public Long getCredentialsRefreshDelay() {
+ return credentialsRefreshDelay;
+ }
+
+ public void setCredentialsRefreshDelay(Long credentialsRefreshDelay) {
+ this.credentialsRefreshDelay = credentialsRefreshDelay;
+ }
+
+ public Boolean getEnableCoreDumps() {
+ return enableCoreDumps;
+ }
+
+ public void setEnableCoreDumps(Boolean enableCoreDumps) {
+ this.enableCoreDumps = enableCoreDumps;
+ }
+
+ public Boolean getFailIfThrottled() {
+ return failIfThrottled;
+ }
+
+ public void setFailIfThrottled(Boolean failIfThrottled) {
+ this.failIfThrottled = failIfThrottled;
+ }
+
+ public String getLogLevel() {
+ return logLevel;
+ }
+
+ public void setLogLevel(String logLevel) {
+ this.logLevel = logLevel;
+ }
+
+ public Long getMaxConnections() {
+ return maxConnections;
+ }
+
+ public void setMaxConnections(Long maxConnections) {
+ this.maxConnections = maxConnections;
+ }
+
+ public String getMetricsGranularity() {
+ return metricsGranularity;
+ }
+
+ public void setMetricsGranularity(String metricsGranularity) {
+ this.metricsGranularity = metricsGranularity;
+ }
+
+ public String getMetricsLevel() {
+ return metricsLevel;
+ }
+
+ public void setMetricsLevel(String metricsLevel) {
+ this.metricsLevel = metricsLevel;
+ }
+
+ public String getMetricsNamespace() {
+ return metricsNamespace;
+ }
+
+ public void setMetricsNamespace(String metricsNamespace) {
+ this.metricsNamespace = metricsNamespace;
+ }
+
+ public Long getMetricsUploadDelay() {
+ return metricsUploadDelay;
+ }
+
+ public void setMetricsUploadDelay(Long metricsUploadDelay) {
+ this.metricsUploadDelay = metricsUploadDelay;
+ }
+
+ public Long getMinConnections() {
+ return minConnections;
+ }
+
+ public void setMinConnections(Long minConnections) {
+ this.minConnections = minConnections;
+ }
+
+ public String getNativeExecutable() {
+ return nativeExecutable;
+ }
+
+ public void setNativeExecutable(String nativeExecutable) {
+ this.nativeExecutable = nativeExecutable;
+ }
+
+ public Long getRateLimit() {
+ return rateLimit;
+ }
+
+ public void setRateLimit(Long rateLimit) {
+ this.rateLimit = rateLimit;
+ }
+
+ public Long getRecordMaxBufferedTime() {
+ return recordMaxBufferedTime;
+ }
+
+ public void setRecordMaxBufferedTime(Long recordMaxBufferedTime) {
+ this.recordMaxBufferedTime = recordMaxBufferedTime;
+ }
+
+ public Long getRecordTtl() {
+ return recordTtl;
+ }
+
+ public void setRecordTtl(Long recordTtl) {
+ this.recordTtl = recordTtl;
+ }
+
+ public Long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public void setRequestTimeout(Long requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
+ public String getTempDirectory() {
+ return tempDirectory;
+ }
+
+ public void setTempDirectory(String tempDirectory) {
+ this.tempDirectory = tempDirectory;
+ }
+
+ public Boolean getVerifyCertificate() {
+ return verifyCertificate;
+ }
+
+ public void setVerifyCertificate(Boolean verifyCertificate) {
+ this.verifyCertificate = verifyCertificate;
+ }
+
+ public String getProxyHost() {
+ return proxyHost;
+ }
+
+ public void setProxyHost(String proxyHost) {
+ this.proxyHost = proxyHost;
+ }
+
+ public Long getProxyPort() {
+ return proxyPort;
+ }
+
+ public void setProxyPort(Long proxyPort) {
+ this.proxyPort = proxyPort;
+ }
+
+ public String getProxyUserName() {
+ return proxyUserName;
+ }
+
+ public void setProxyUserName(String proxyUserName) {
+ this.proxyUserName = proxyUserName;
+ }
+
+ public String getProxyPassword() {
+ return proxyPassword;
+ }
+
+ public void setProxyPassword(String proxyPassword) {
+ this.proxyPassword = proxyPassword;
+ }
+
+ public String getStsEndpoint() {
+ return stsEndpoint;
+ }
+
+ public void setStsEndpoint(String stsEndpoint) {
+ this.stsEndpoint = stsEndpoint;
+ }
+
+ public Long getStsPort() {
+ return stsPort;
+ }
+
+ public void setStsPort(Long stsPort) {
+ this.stsPort = stsPort;
+ }
+
+ public KinesisProducerConfiguration.ThreadingModel getThreadingModel() {
+ return threadingModel;
+ }
+
+ public void setThreadingModel(KinesisProducerConfiguration.ThreadingModel threadingModel) {
+ this.threadingModel = threadingModel;
+ }
+
+ public Integer getThreadPoolSize() {
+ return threadPoolSize;
+ }
+
+ public void setThreadPoolSize(Integer threadPoolSize) {
+ this.threadPoolSize = threadPoolSize;
+ }
+
+ public Long getUserRecordTimeoutInMillis() {
+ return userRecordTimeoutInMillis;
+ }
+
+ public void setUserRecordTimeoutInMillis(Long userRecordTimeoutInMillis) {
+ this.userRecordTimeoutInMillis = userRecordTimeoutInMillis;
+ }
+}
diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java
new file mode 100644
index 000000000..a809a0e18
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import static io.awspring.cloud.autoconfigure.kinesis.KinesisProperties.PREFIX;
+
+import io.awspring.cloud.autoconfigure.AwsClientProperties;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * Properties related to KinesisClient
+ *
+ * @author Matej Nedic
+ * @since 4.0.0
+ */
+@ConfigurationProperties(prefix = PREFIX)
+public class KinesisProperties extends AwsClientProperties {
+ /**
+ * The prefix used for AWS Kinesis configuration.
+ */
+ public static final String PREFIX = "spring.cloud.aws.kinesis";
+}
diff --git a/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
index 685d3a75a..11ac764f7 100644
--- a/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
+++ b/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -16,3 +16,6 @@ io.awspring.cloud.autoconfigure.config.secretsmanager.SecretsManagerAutoConfigur
io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreReloadAutoConfiguration
io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreAutoConfiguration
io.awspring.cloud.autoconfigure.config.s3.S3ReloadAutoConfiguration
+io.awspring.cloud.autoconfigure.kinesis.KinesisAutoConfiguration
+io.awspring.cloud.autoconfigure.kinesis.KinesisProducerAutoConfiguration
+io.awspring.cloud.autoconfigure.kinesis.KinesisClientLibraryAutoConfiguration
diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java
new file mode 100644
index 000000000..03fdbf00e
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.awspring.cloud.autoconfigure.ConfiguredAwsClient;
+import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration;
+import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
+import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
+import java.net.URI;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.autoconfigure.AutoConfigurations;
+import org.springframework.boot.test.context.runner.ApplicationContextRunner;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+/**
+ * Tests for {@link KinesisAutoConfiguration}.
+ *
+ * @author Matej Nedic
+ */
+class KinesisAutoConfigurationTest {
+
+ private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
+ .withPropertyValues("spring.cloud.aws.region.static:eu-west-1",
+ "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop")
+ .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class,
+ CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class));
+
+ @Test
+ void disableKinesisIntegration() {
+ this.contextRunner.withPropertyValues("spring.cloud.aws.kinesis.enabled:false").run(context -> {
+ assertThat(context).doesNotHaveBean(KinesisAsyncClient.class);
+ });
+ }
+
+ @Test
+ void withCustomEndpoint() {
+ this.contextRunner.withPropertyValues("spring.cloud.aws.kinesis.endpoint:http://localhost:8090")
+ .run(context -> {
+ ConfiguredAwsClient client = new ConfiguredAwsClient(context.getBean(KinesisAsyncClient.class));
+ assertThat(client.getEndpoint()).isEqualTo(URI.create("http://localhost:8090"));
+ assertThat(client.isEndpointOverridden()).isTrue();
+ });
+ }
+}
diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java
new file mode 100644
index 000000000..a4043f14f
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.awspring.cloud.autoconfigure.ConfiguredAwsClient;
+import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration;
+import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
+import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
+import java.time.Duration;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.autoconfigure.AutoConfigurations;
+import org.springframework.boot.test.context.runner.ApplicationContextRunner;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+public class KinesisClientCustomizerTests {
+
+ private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
+ .withPropertyValues("spring.cloud.aws.region.static:eu-west-1",
+ "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop")
+ .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class,
+ CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class));
+
+ @Test
+ void customKinesisClientCustomizer() {
+ contextRunner.withUserConfiguration(KinesisClientCustomizerTests.CustomizerConfig.class).run(context -> {
+ ConfiguredAwsClient kinesisAsyncClient = new ConfiguredAwsClient(context.getBean(KinesisAsyncClient.class));
+ assertThat(kinesisAsyncClient.getApiCallTimeout()).describedAs("sets property from first customizer")
+ .isEqualTo(Duration.ofMillis(2001));
+ assertThat(kinesisAsyncClient.getApiCallAttemptTimeout())
+ .describedAs("sets property from second customizer").isEqualTo(Duration.ofMillis(2002));
+ });
+ }
+
+ @Configuration(proxyBeanMethods = false)
+ static class CustomizerConfig {
+
+ @Bean
+ KinesisAsyncClientCustomizer kinesisClientCustomizer() {
+ return builder -> {
+ builder.overrideConfiguration(builder.overrideConfiguration().copy(c -> {
+ c.apiCallTimeout(Duration.ofMillis(2001));
+ }));
+ };
+ }
+
+ @Bean
+ KinesisAsyncClientCustomizer kinesisClientCustomizer2() {
+ return builder -> {
+ builder.overrideConfiguration(builder.overrideConfiguration().copy(c -> {
+ c.apiCallAttemptTimeout(Duration.ofMillis(2002));
+ }));
+ };
+ }
+ }
+}
diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml
index eea04bba6..f1e113d22 100644
--- a/spring-cloud-aws-dependencies/pom.xml
+++ b/spring-cloud-aws-dependencies/pom.xml
@@ -265,11 +265,13 @@
software.amazon.kinesis
amazon-kinesis-client
${kcl.version}
+ true
software.amazon.kinesis
amazon-kinesis-producer
${kpl.version}
+ true
diff --git a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java
index 599862add..4fa957a7b 100644
--- a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java
+++ b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java
@@ -23,7 +23,6 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -112,13 +111,14 @@ void kinesisInboundOutbound() throws InterruptedException {
.contains("Channel 'kinesisReceiveChannel' expected one of the following data types "
+ "[class java.util.Date], but received [class java.lang.String]");
- String errorSequenceNumber = errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class).sequenceNumber();
+ String errorSequenceNumber = errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class)
+ .sequenceNumber();
// Second exception for the same record since we have requested via bubbling exception up to the consumer
errorMessage = this.errorChannel.receive(30_000);
assertThat(errorMessage).isNotNull();
assertThat(errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class).sequenceNumber())
- .isEqualTo(errorSequenceNumber);
+ .isEqualTo(errorSequenceNumber);
for (int i = 0; i < 2; i++) {
this.kinesisSendChannel
diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java
index fb240cffa..a3f850ede 100644
--- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java
+++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java
@@ -24,8 +24,6 @@
import io.awspring.cloud.sqs.MessageExecutionThreadFactory;
import io.awspring.cloud.sqs.listener.BackPressureMode;
-import io.awspring.cloud.sqs.listener.ContainerOptions;
-import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.SqsContainerOptions;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor;
@@ -442,10 +440,9 @@ else if (currentPoll.compareAndSet(2, 3)) {
void shouldRemovePollingFutureOnException() throws InterruptedException {
String testName = "shouldClearPollingFuturesOnException";
- BackPressureHandler backPressureHandler = BackPressureHandlerFactories
- .adaptiveThroughputBackPressureHandler()
- .createBackPressureHandler(SqsContainerOptions.builder()
- .maxDelayBetweenPolls(Duration.ofMillis(100)).backPressureMode(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES).build());
+ BackPressureHandler backPressureHandler = BackPressureHandlerFactories.adaptiveThroughputBackPressureHandler()
+ .createBackPressureHandler(SqsContainerOptions.builder().maxDelayBetweenPolls(Duration.ofMillis(100))
+ .backPressureMode(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES).build());
AbstractPollingMessageSource