diff --git a/pom.xml b/pom.xml index bb7cb54c4..ab0213e11 100644 --- a/pom.xml +++ b/pom.xml @@ -45,11 +45,11 @@ spring-cloud-aws-sqs spring-cloud-aws-dynamodb spring-cloud-aws-kinesis - spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis spring-cloud-aws-s3 spring-cloud-aws-testcontainers spring-cloud-aws-starters/spring-cloud-aws-starter spring-cloud-aws-starters/spring-cloud-aws-starter-dynamodb + spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis spring-cloud-aws-starters/spring-cloud-aws-starter-integration-dynamodb spring-cloud-aws-starters/spring-cloud-aws-starter-metrics spring-cloud-aws-starters/spring-cloud-aws-starter-parameter-store @@ -61,6 +61,9 @@ spring-cloud-aws-starters/spring-cloud-aws-starter-integration-sns spring-cloud-aws-starters/spring-cloud-aws-starter-sqs spring-cloud-aws-starters/spring-cloud-aws-starter-integration-sqs + spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis + spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer + spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library spring-cloud-aws-samples spring-cloud-aws-test spring-cloud-aws-modulith diff --git a/spring-cloud-aws-autoconfigure/pom.xml b/spring-cloud-aws-autoconfigure/pom.xml index 8d7e8e4ab..1e50b2692 100644 --- a/spring-cloud-aws-autoconfigure/pom.xml +++ b/spring-cloud-aws-autoconfigure/pom.xml @@ -181,6 +181,21 @@ kms true + + software.amazon.awssdk + kinesis + true + + + software.amazon.kinesis + amazon-kinesis-client + true + + + software.amazon.kinesis + amazon-kinesis-producer + true + software.amazon.encryption.s3 amazon-s3-encryption-client-java diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java new file mode 100644 index 000000000..60bdb3d74 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java @@ -0,0 +1,29 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import io.awspring.cloud.autoconfigure.AwsClientCustomizer; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; + +/** + * Callback interface that can be used to customize a {@link KinesisAsyncClientBuilder}. + * + * @author Matej Nedic + * @since 4.0.0 + */ +@FunctionalInterface +public interface KinesisAsyncClientCustomizer extends AwsClientCustomizer { +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java new file mode 100644 index 000000000..2a221a3b3 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java @@ -0,0 +1,52 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import io.awspring.cloud.autoconfigure.AwsAsyncClientCustomizer; +import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer; +import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; + +@AutoConfiguration +@ConditionalOnClass({ KinesisAsyncClient.class }) +@EnableConfigurationProperties({ KinesisProperties.class }) +@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class }) +@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.enabled", havingValue = "true", matchIfMissing = true) +public class KinesisAutoConfiguration { + + @ConditionalOnMissingBean + @Bean + public KinesisAsyncClient kinesisAsyncClient(KinesisProperties properties, + AwsClientBuilderConfigurer awsClientBuilderConfigurer, + ObjectProvider connectionDetails, + ObjectProvider kinesisAsyncClientCustomizer, + ObjectProvider awsSyncClientCustomizers) { + return awsClientBuilderConfigurer + .configureAsyncClient(KinesisAsyncClient.builder(), properties, connectionDetails.getIfAvailable(), + kinesisAsyncClientCustomizer.orderedStream(), awsSyncClientCustomizers.orderedStream()) + .build(); + } +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java new file mode 100644 index 000000000..9e690d1c9 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryAutoConfiguration.java @@ -0,0 +1,49 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.coordinator.Scheduler; +import software.amazon.kinesis.processor.ShardRecordProcessorFactory; + +@AutoConfiguration +@ConditionalOnClass({ KinesisAsyncClient.class, Scheduler.class }) +@EnableConfigurationProperties({ KinesisClientLibraryProperties.class }) +@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class, + KinesisAutoConfiguration.class }) +@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.client.library.enabled", havingValue = "true", matchIfMissing = true) +public class KinesisClientLibraryAutoConfiguration { + + @ConditionalOnMissingBean + @Bean + public Scheduler scheduler(ObjectProvider dynamoDbClient, + ObjectProvider cloudWatchClient, KinesisAsyncClient kinesisAsyncClient, + KinesisClientLibraryProperties properties, ShardRecordProcessorFactory processorFactory) { + return null; + } +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java new file mode 100644 index 000000000..c31cc6798 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientLibraryProperties.java @@ -0,0 +1,45 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import static io.awspring.cloud.autoconfigure.kinesis.KinesisClientLibraryProperties.PREFIX; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = PREFIX) +public class KinesisClientLibraryProperties { + + public static final String PREFIX = "spring.cloud.aws.kinesis.client.library"; + + private String streamName; + private String applicationName; + + public String getStreamName() { + return streamName; + } + + public void setStreamName(String streamName) { + this.streamName = streamName; + } + + public String getApplicationName() { + return applicationName; + } + + public void setApplicationName(String applicationName) { + this.applicationName = applicationName; + } +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java new file mode 100644 index 000000000..5ab585915 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java @@ -0,0 +1,100 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer; +import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.context.annotation.Bean; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.providers.AwsRegionProvider; +import software.amazon.kinesis.producer.KinesisProducer; +import software.amazon.kinesis.producer.KinesisProducerConfiguration; + +@AutoConfiguration +@ConditionalOnClass({ KinesisProducer.class, KinesisProducerConfiguration.class }) +@EnableConfigurationProperties({ KinesisProducerProperties.class }) +@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class }) +@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.producer.enabled", havingValue = "true", matchIfMissing = true) +public class KinesisProducerAutoConfiguration { + + @ConditionalOnMissingBean + @Bean + public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProducerProperties prop, + AwsCredentialsProvider credentialsProvider, AwsRegionProvider awsRegionProvider, + ObjectProvider connectionDetails) { + PropertyMapper propertyMapper = PropertyMapper.get(); + KinesisProducerConfiguration config = new KinesisProducerConfiguration(); + propertyMapper.from(prop::getAggregationEnabled).whenNonNull().to(config::setAggregationEnabled); + propertyMapper.from(prop::getAggregationMaxCount).whenNonNull().to(config::setAggregationMaxCount); + propertyMapper.from(prop::getAggregationMaxSize).whenNonNull().to(config::setAggregationMaxSize); + propertyMapper.from(prop::getCloudwatchEndpoint).whenHasText().to(config::setCloudwatchEndpoint); + propertyMapper.from(prop::getCloudwatchPort).whenNonNull().to(config::setCloudwatchPort); + propertyMapper.from(prop::getCollectionMaxCount).whenNonNull().to(config::setCollectionMaxCount); + propertyMapper.from(prop::getCollectionMaxSize).whenNonNull().to(config::setCollectionMaxSize); + propertyMapper.from(prop::getConnectTimeout).whenNonNull().to(config::setConnectTimeout); + propertyMapper.from(prop::getCredentialsRefreshDelay).whenNonNull().to(config::setCredentialsRefreshDelay); + propertyMapper.from(prop::getEnableCoreDumps).whenNonNull().to(config::setEnableCoreDumps); + propertyMapper.from(prop::getFailIfThrottled).whenNonNull().to(config::setFailIfThrottled); + propertyMapper.from(prop::getLogLevel).whenHasText().to(config::setLogLevel); + propertyMapper.from(prop::getMaxConnections).whenNonNull().to(config::setMaxConnections); + propertyMapper.from(prop::getMetricsGranularity).whenHasText().to(config::setMetricsGranularity); + propertyMapper.from(prop::getMetricsLevel).whenHasText().to(config::setMetricsLevel); + propertyMapper.from(prop::getMetricsNamespace).whenHasText().to(config::setMetricsNamespace); + propertyMapper.from(prop::getMetricsUploadDelay).whenNonNull().to(config::setMetricsUploadDelay); + propertyMapper.from(prop::getMinConnections).whenNonNull().to(config::setMinConnections); + propertyMapper.from(prop::getNativeExecutable).whenNonNull().to(config::setNativeExecutable); + propertyMapper.from(prop::getRateLimit).whenNonNull().to(config::setRateLimit); + propertyMapper.from(prop::getRecordMaxBufferedTime).whenNonNull().to(config::setRecordMaxBufferedTime); + propertyMapper.from(prop::getRecordTtl).whenNonNull().to(config::setRecordTtl); + propertyMapper.from(prop::getRequestTimeout).whenNonNull().to(config::setRequestTimeout); + propertyMapper.from(prop::getTempDirectory).whenNonNull().to(config::setTempDirectory); + propertyMapper.from(prop::getVerifyCertificate).whenNonNull().to(config::setVerifyCertificate); + propertyMapper.from(prop.getProxyHost()).whenNonNull().to(config::setProxyHost); + propertyMapper.from(prop.getProxyPort()).whenNonNull().to(config::setProxyPort); + propertyMapper.from(prop.getProxyUserName()).whenHasText().to(config::setProxyUserName); + propertyMapper.from(prop.getProxyPassword()).whenHasText().to(config::setProxyPassword); + propertyMapper.from(prop.getStsEndpoint()).whenHasText().to(config::setStsEndpoint); + propertyMapper.from(prop.getStsPort()).whenNonNull().to(config::setStsPort); + propertyMapper.from(prop.getThreadingModel()).whenNonNull().to(config::setThreadingModel); + propertyMapper.from(prop.getThreadPoolSize()).whenNonNull().to(config::setThreadPoolSize); + propertyMapper.from(prop.getUserRecordTimeoutInMillis()).whenNonNull().to(config::setUserRecordTimeoutInMillis); + + config.setCredentialsProvider(credentialsProvider); + config.setRegion(AwsClientBuilderConfigurer + .resolveRegion(prop, connectionDetails.getIfAvailable(), awsRegionProvider).toString()); + connectionDetails.ifAvailable(cd -> { + config.setKinesisPort(cd.getEndpoint().getPort()); + config.setKinesisEndpoint(cd.getEndpoint().getHost()); + }); + return config; + } + + @ConditionalOnMissingBean + @Bean + public KinesisProducer kinesisProducer(KinesisProducerConfiguration kinesisProducerConfiguration) { + return new KinesisProducer(kinesisProducerConfiguration); + } +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java new file mode 100644 index 000000000..5d90fde67 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java @@ -0,0 +1,479 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import static io.awspring.cloud.autoconfigure.kinesis.KinesisProducerProperties.PREFIX; + +import io.awspring.cloud.autoconfigure.AwsClientProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import software.amazon.kinesis.producer.KinesisProducerConfiguration; + +/** + * Properties related to KinesisProducer + * + * @author Matej Nedic + * @since 4.0.0 + */ +@ConfigurationProperties(prefix = PREFIX) +public class KinesisProducerProperties extends AwsClientProperties { + + /** + * The prefix used for AWS Kinesis configuration. + */ + public static final String PREFIX = "spring.cloud.aws.kinesis.producer"; + + /** + * Whether aggregation of user records is enabled. + */ + private Boolean aggregationEnabled; + + /** + * Maximum number of user records to aggregate. Must be between 1 and Long.MAX_VALUE. + */ + private Long aggregationMaxCount; + + /** + * Maximum size in bytes of aggregated records. Must be between 64 and 1048576. + */ + private Long aggregationMaxSize; + + /** + * CloudWatch endpoint. Must match the regex: ^([A-Za-z0-9-\\.]+)?$ + */ + private String cloudwatchEndpoint; + + /** + * CloudWatch port. Must be between 1 and 65535. + */ + private Long cloudwatchPort; + + /** + * Maximum number of records to collect before sending a batch. Must be between 1 and 500. + */ + private Long collectionMaxCount; + + /** + * Maximum size in bytes of collected records before sending. Must be between 52224 and Long.MAX_VALUE. + */ + private Long collectionMaxSize; + + /** + * Connection timeout in milliseconds. Must be between 100 and 300000. + */ + private Long connectTimeout; + + /** + * Delay in milliseconds for credentials refresh. Must be between 1 and 300000. + */ + private Long credentialsRefreshDelay; + + /** + * Whether core dumps are enabled. + */ + private Boolean enableCoreDumps; + + /** + * Whether to fail if throttled by Kinesis. + */ + private Boolean failIfThrottled; + + /** + * Log level for the producer. Allowed values: trace, debug, info, warning, error. + */ + private String logLevel; + + /** + * Maximum number of connections. Must be between 1 and 256. + */ + private Long maxConnections; + + /** + * Metrics granularity. Allowed values: global, stream, shard. + */ + private String metricsGranularity; + + /** + * Metrics level. Allowed values: none, summary, detailed. + */ + private String metricsLevel; + + /** + * Metrics namespace. Must match the regex: (?!AWS/).{1,255} + */ + private String metricsNamespace; + + /** + * Delay in milliseconds for uploading metrics. Must be between 1 and 60000. + */ + private Long metricsUploadDelay; + + /** + * Minimum number of connections. Must be between 1 and 16. + */ + private Long minConnections; + + /** + * Native executable path. + */ + private String nativeExecutable; + + /** + * Rate limit for records per second. Must be between 1 and Long.MAX_VALUE. + */ + private Long rateLimit; + + /** + * Maximum buffered time for records in milliseconds. Must be between 0 and Long.MAX_VALUE. + */ + private Long recordMaxBufferedTime; + + /** + * Time to live for records in milliseconds. Must be between 100 and Long.MAX_VALUE. + */ + private Long recordTtl; + + /** + * Request timeout in milliseconds. Must be between 100 and 600000. + */ + private Long requestTimeout; + + /** + * Temporary directory path for the producer. + */ + private String tempDirectory; + + /** + * Whether to verify SSL certificates. + */ + private Boolean verifyCertificate; + + /** + * Proxy host. + */ + private String proxyHost; + + /** + * Proxy port. Must be between 1 and 65535. + */ + private Long proxyPort; + + /** + * Proxy username. + */ + private String proxyUserName; + + /** + * Proxy password. + */ + private String proxyPassword; + + /** + * STS endpoint. Must match the regex: ^([A-Za-z0-9-\\.]+)?$ + */ + private String stsEndpoint; + + /** + * STS port. Must be between 1 and 65535. + */ + private Long stsPort; + + /** + * Threading model for the producer. + */ + private KinesisProducerConfiguration.ThreadingModel threadingModel; + + /** + * Thread pool size. Must be greater than or equal to 0. + */ + private Integer threadPoolSize; + + /** + * Timeout in milliseconds for user records. Must be greater than or equal to 0. + */ + private Long userRecordTimeoutInMillis; + + public Boolean getAggregationEnabled() { + return aggregationEnabled; + } + + public void setAggregationEnabled(Boolean aggregationEnabled) { + this.aggregationEnabled = aggregationEnabled; + } + + public Long getAggregationMaxCount() { + return aggregationMaxCount; + } + + public void setAggregationMaxCount(Long aggregationMaxCount) { + this.aggregationMaxCount = aggregationMaxCount; + } + + public Long getAggregationMaxSize() { + return aggregationMaxSize; + } + + public void setAggregationMaxSize(Long aggregationMaxSize) { + this.aggregationMaxSize = aggregationMaxSize; + } + + public String getCloudwatchEndpoint() { + return cloudwatchEndpoint; + } + + public void setCloudwatchEndpoint(String cloudwatchEndpoint) { + this.cloudwatchEndpoint = cloudwatchEndpoint; + } + + public Long getCloudwatchPort() { + return cloudwatchPort; + } + + public void setCloudwatchPort(Long cloudwatchPort) { + this.cloudwatchPort = cloudwatchPort; + } + + public Long getCollectionMaxCount() { + return collectionMaxCount; + } + + public void setCollectionMaxCount(Long collectionMaxCount) { + this.collectionMaxCount = collectionMaxCount; + } + + public Long getCollectionMaxSize() { + return collectionMaxSize; + } + + public void setCollectionMaxSize(Long collectionMaxSize) { + this.collectionMaxSize = collectionMaxSize; + } + + public Long getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(Long connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public Long getCredentialsRefreshDelay() { + return credentialsRefreshDelay; + } + + public void setCredentialsRefreshDelay(Long credentialsRefreshDelay) { + this.credentialsRefreshDelay = credentialsRefreshDelay; + } + + public Boolean getEnableCoreDumps() { + return enableCoreDumps; + } + + public void setEnableCoreDumps(Boolean enableCoreDumps) { + this.enableCoreDumps = enableCoreDumps; + } + + public Boolean getFailIfThrottled() { + return failIfThrottled; + } + + public void setFailIfThrottled(Boolean failIfThrottled) { + this.failIfThrottled = failIfThrottled; + } + + public String getLogLevel() { + return logLevel; + } + + public void setLogLevel(String logLevel) { + this.logLevel = logLevel; + } + + public Long getMaxConnections() { + return maxConnections; + } + + public void setMaxConnections(Long maxConnections) { + this.maxConnections = maxConnections; + } + + public String getMetricsGranularity() { + return metricsGranularity; + } + + public void setMetricsGranularity(String metricsGranularity) { + this.metricsGranularity = metricsGranularity; + } + + public String getMetricsLevel() { + return metricsLevel; + } + + public void setMetricsLevel(String metricsLevel) { + this.metricsLevel = metricsLevel; + } + + public String getMetricsNamespace() { + return metricsNamespace; + } + + public void setMetricsNamespace(String metricsNamespace) { + this.metricsNamespace = metricsNamespace; + } + + public Long getMetricsUploadDelay() { + return metricsUploadDelay; + } + + public void setMetricsUploadDelay(Long metricsUploadDelay) { + this.metricsUploadDelay = metricsUploadDelay; + } + + public Long getMinConnections() { + return minConnections; + } + + public void setMinConnections(Long minConnections) { + this.minConnections = minConnections; + } + + public String getNativeExecutable() { + return nativeExecutable; + } + + public void setNativeExecutable(String nativeExecutable) { + this.nativeExecutable = nativeExecutable; + } + + public Long getRateLimit() { + return rateLimit; + } + + public void setRateLimit(Long rateLimit) { + this.rateLimit = rateLimit; + } + + public Long getRecordMaxBufferedTime() { + return recordMaxBufferedTime; + } + + public void setRecordMaxBufferedTime(Long recordMaxBufferedTime) { + this.recordMaxBufferedTime = recordMaxBufferedTime; + } + + public Long getRecordTtl() { + return recordTtl; + } + + public void setRecordTtl(Long recordTtl) { + this.recordTtl = recordTtl; + } + + public Long getRequestTimeout() { + return requestTimeout; + } + + public void setRequestTimeout(Long requestTimeout) { + this.requestTimeout = requestTimeout; + } + + public String getTempDirectory() { + return tempDirectory; + } + + public void setTempDirectory(String tempDirectory) { + this.tempDirectory = tempDirectory; + } + + public Boolean getVerifyCertificate() { + return verifyCertificate; + } + + public void setVerifyCertificate(Boolean verifyCertificate) { + this.verifyCertificate = verifyCertificate; + } + + public String getProxyHost() { + return proxyHost; + } + + public void setProxyHost(String proxyHost) { + this.proxyHost = proxyHost; + } + + public Long getProxyPort() { + return proxyPort; + } + + public void setProxyPort(Long proxyPort) { + this.proxyPort = proxyPort; + } + + public String getProxyUserName() { + return proxyUserName; + } + + public void setProxyUserName(String proxyUserName) { + this.proxyUserName = proxyUserName; + } + + public String getProxyPassword() { + return proxyPassword; + } + + public void setProxyPassword(String proxyPassword) { + this.proxyPassword = proxyPassword; + } + + public String getStsEndpoint() { + return stsEndpoint; + } + + public void setStsEndpoint(String stsEndpoint) { + this.stsEndpoint = stsEndpoint; + } + + public Long getStsPort() { + return stsPort; + } + + public void setStsPort(Long stsPort) { + this.stsPort = stsPort; + } + + public KinesisProducerConfiguration.ThreadingModel getThreadingModel() { + return threadingModel; + } + + public void setThreadingModel(KinesisProducerConfiguration.ThreadingModel threadingModel) { + this.threadingModel = threadingModel; + } + + public Integer getThreadPoolSize() { + return threadPoolSize; + } + + public void setThreadPoolSize(Integer threadPoolSize) { + this.threadPoolSize = threadPoolSize; + } + + public Long getUserRecordTimeoutInMillis() { + return userRecordTimeoutInMillis; + } + + public void setUserRecordTimeoutInMillis(Long userRecordTimeoutInMillis) { + this.userRecordTimeoutInMillis = userRecordTimeoutInMillis; + } +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java new file mode 100644 index 000000000..a809a0e18 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java @@ -0,0 +1,35 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import static io.awspring.cloud.autoconfigure.kinesis.KinesisProperties.PREFIX; + +import io.awspring.cloud.autoconfigure.AwsClientProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Properties related to KinesisClient + * + * @author Matej Nedic + * @since 4.0.0 + */ +@ConfigurationProperties(prefix = PREFIX) +public class KinesisProperties extends AwsClientProperties { + /** + * The prefix used for AWS Kinesis configuration. + */ + public static final String PREFIX = "spring.cloud.aws.kinesis"; +} diff --git a/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 685d3a75a..11ac764f7 100644 --- a/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -16,3 +16,6 @@ io.awspring.cloud.autoconfigure.config.secretsmanager.SecretsManagerAutoConfigur io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreReloadAutoConfiguration io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreAutoConfiguration io.awspring.cloud.autoconfigure.config.s3.S3ReloadAutoConfiguration +io.awspring.cloud.autoconfigure.kinesis.KinesisAutoConfiguration +io.awspring.cloud.autoconfigure.kinesis.KinesisProducerAutoConfiguration +io.awspring.cloud.autoconfigure.kinesis.KinesisClientLibraryAutoConfiguration diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java new file mode 100644 index 000000000..03fdbf00e --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java @@ -0,0 +1,59 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.awspring.cloud.autoconfigure.ConfiguredAwsClient; +import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import java.net.URI; +import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; + +/** + * Tests for {@link KinesisAutoConfiguration}. + * + * @author Matej Nedic + */ +class KinesisAutoConfigurationTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withPropertyValues("spring.cloud.aws.region.static:eu-west-1", + "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop") + .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class, + CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class)); + + @Test + void disableKinesisIntegration() { + this.contextRunner.withPropertyValues("spring.cloud.aws.kinesis.enabled:false").run(context -> { + assertThat(context).doesNotHaveBean(KinesisAsyncClient.class); + }); + } + + @Test + void withCustomEndpoint() { + this.contextRunner.withPropertyValues("spring.cloud.aws.kinesis.endpoint:http://localhost:8090") + .run(context -> { + ConfiguredAwsClient client = new ConfiguredAwsClient(context.getBean(KinesisAsyncClient.class)); + assertThat(client.getEndpoint()).isEqualTo(URI.create("http://localhost:8090")); + assertThat(client.isEndpointOverridden()).isTrue(); + }); + } +} diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java new file mode 100644 index 000000000..a4043f14f --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java @@ -0,0 +1,72 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.awspring.cloud.autoconfigure.ConfiguredAwsClient; +import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import java.time.Duration; +import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; + +public class KinesisClientCustomizerTests { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withPropertyValues("spring.cloud.aws.region.static:eu-west-1", + "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop") + .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class, + CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class)); + + @Test + void customKinesisClientCustomizer() { + contextRunner.withUserConfiguration(KinesisClientCustomizerTests.CustomizerConfig.class).run(context -> { + ConfiguredAwsClient kinesisAsyncClient = new ConfiguredAwsClient(context.getBean(KinesisAsyncClient.class)); + assertThat(kinesisAsyncClient.getApiCallTimeout()).describedAs("sets property from first customizer") + .isEqualTo(Duration.ofMillis(2001)); + assertThat(kinesisAsyncClient.getApiCallAttemptTimeout()) + .describedAs("sets property from second customizer").isEqualTo(Duration.ofMillis(2002)); + }); + } + + @Configuration(proxyBeanMethods = false) + static class CustomizerConfig { + + @Bean + KinesisAsyncClientCustomizer kinesisClientCustomizer() { + return builder -> { + builder.overrideConfiguration(builder.overrideConfiguration().copy(c -> { + c.apiCallTimeout(Duration.ofMillis(2001)); + })); + }; + } + + @Bean + KinesisAsyncClientCustomizer kinesisClientCustomizer2() { + return builder -> { + builder.overrideConfiguration(builder.overrideConfiguration().copy(c -> { + c.apiCallAttemptTimeout(Duration.ofMillis(2002)); + })); + }; + } + } +} diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml index eea04bba6..f1e113d22 100644 --- a/spring-cloud-aws-dependencies/pom.xml +++ b/spring-cloud-aws-dependencies/pom.xml @@ -265,11 +265,13 @@ software.amazon.kinesis amazon-kinesis-client ${kcl.version} + true software.amazon.kinesis amazon-kinesis-producer ${kpl.version} + true diff --git a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java index 599862add..4fa957a7b 100644 --- a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java +++ b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java @@ -23,7 +23,6 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; - import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -112,13 +111,14 @@ void kinesisInboundOutbound() throws InterruptedException { .contains("Channel 'kinesisReceiveChannel' expected one of the following data types " + "[class java.util.Date], but received [class java.lang.String]"); - String errorSequenceNumber = errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class).sequenceNumber(); + String errorSequenceNumber = errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class) + .sequenceNumber(); // Second exception for the same record since we have requested via bubbling exception up to the consumer errorMessage = this.errorChannel.receive(30_000); assertThat(errorMessage).isNotNull(); assertThat(errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class).sequenceNumber()) - .isEqualTo(errorSequenceNumber); + .isEqualTo(errorSequenceNumber); for (int i = 0; i < 2; i++) { this.kinesisSendChannel diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java index fb240cffa..a3f850ede 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java @@ -24,8 +24,6 @@ import io.awspring.cloud.sqs.MessageExecutionThreadFactory; import io.awspring.cloud.sqs.listener.BackPressureMode; -import io.awspring.cloud.sqs.listener.ContainerOptions; -import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder; import io.awspring.cloud.sqs.listener.SqsContainerOptions; import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementCallback; import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor; @@ -442,10 +440,9 @@ else if (currentPoll.compareAndSet(2, 3)) { void shouldRemovePollingFutureOnException() throws InterruptedException { String testName = "shouldClearPollingFuturesOnException"; - BackPressureHandler backPressureHandler = BackPressureHandlerFactories - .adaptiveThroughputBackPressureHandler() - .createBackPressureHandler(SqsContainerOptions.builder() - .maxDelayBetweenPolls(Duration.ofMillis(100)).backPressureMode(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES).build()); + BackPressureHandler backPressureHandler = BackPressureHandlerFactories.adaptiveThroughputBackPressureHandler() + .createBackPressureHandler(SqsContainerOptions.builder().maxDelayBetweenPolls(Duration.ofMillis(100)) + .backPressureMode(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES).build()); AbstractPollingMessageSource source = new AbstractPollingMessageSource<>() { @Override diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library/pom.xml new file mode 100644 index 000000000..ef90ec96b --- /dev/null +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library/pom.xml @@ -0,0 +1,33 @@ + + + + + spring-cloud-aws + io.awspring.cloud + 4.0.0-SNAPSHOT + ../../pom.xml + + + 4.0.0 + + spring-cloud-aws-starter-kinesis-client-library + Spring Cloud AWS Kinesis Client Library + Spring Cloud AWS Kinesis Client Library + + + + io.awspring.cloud + spring-cloud-aws-starter + + + software.amazon.kinesis + amazon-kinesis-client + + + software.amazon.awssdk + kinesis + + + diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer/pom.xml new file mode 100644 index 000000000..92fa24ae8 --- /dev/null +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer/pom.xml @@ -0,0 +1,29 @@ + + + + + spring-cloud-aws + io.awspring.cloud + 4.0.0-SNAPSHOT + ../../pom.xml + + + 4.0.0 + + spring-cloud-aws-starter-kinesis-producer + Spring Cloud AWS Kinesis Producer Starter + Spring Cloud AWS Kinesis Producer Starter + + + + io.awspring.cloud + spring-cloud-aws-starter + + + software.amazon.kinesis + amazon-kinesis-producer + + + diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml new file mode 100644 index 000000000..a9f02f837 --- /dev/null +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml @@ -0,0 +1,29 @@ + + + + + spring-cloud-aws + io.awspring.cloud + 4.0.0-SNAPSHOT + ../../pom.xml + + + 4.0.0 + + spring-cloud-aws-starter-kinesis + Spring Cloud AWS Kinesis Starter + Spring Cloud AWS Kinesis Starter + + + + io.awspring.cloud + spring-cloud-aws-starter + + + software.amazon.awssdk + kinesis + + +