Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ release.properties
*.iml
.classpath
.project
.factorypath
.settings/
.vscode/
.attach_pid*
Expand Down
1 change: 1 addition & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<module>spring-reactive</module>
<module>spring-rsocket</module>
<module>spring-function</module>
<module>spring-kafka</module>
</modules>

</project>
32 changes: 32 additions & 0 deletions examples/spring-kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Spring Kafka + CloudEvents sample

## Build

```shell
mvn package
```

## Start Consumer

```shell
mvn spring-boot:run
```

You can try sending a request using any kafka client, or using the intergration tests in this project. You send to the "in" topic and it echos back a cloud event on the "out" topic. The listener is implemented like this (the request and response are modelled directly as a `CloudEvent`):

```java
@KafkaListener(id = "listener", topics = "in", clientIdPrefix = "demo")
@SendTo("out")
public CloudEvent listen(CloudEvent event) {
return ...;
}
```

and to make that work we need to install the Kafka message converter as a `@Bean`:

```java
@Bean
public CloudEventRecordMessageConverter recordMessageConverter() {
return new CloudEventRecordMessageConverter();
}
```
82 changes: 82 additions & 0 deletions examples/spring-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-examples</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>cloudevents-spring-kafka-example</artifactId>

<properties>
<spring-boot.version>2.4.3</spring-boot.version>
<testcontainers.version>1.15.2</testcontainers.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>${testcontainers.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-spring</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.cloudevents.examples.spring;

import java.net.URI;
import java.util.UUID;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.messaging.handler.annotation.SendTo;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.spring.kafka.CloudEventRecordMessageConverter;

@SpringBootApplication
public class DemoApplication {

public static void main(String[] args) throws Exception {
SpringApplication.run(DemoApplication.class, args);
}

@KafkaListener(id = "listener", topics = "in", clientIdPrefix = "demo")
@SendTo("out")
public CloudEvent listen(CloudEvent event) {
System.err.println("Echo: " + event);
return CloudEventBuilder.from(event).withId(UUID.randomUUID().toString())
.withSource(URI.create("https://spring.io/foos")).withType("io.spring.event.Foo")
.withData(event.getData().toBytes()).build();
}

@Bean
public NewTopic topicOut() {
return TopicBuilder.name("out").build();
}

@Bean
public NewTopic topicIn() {
return TopicBuilder.name("in").build();
}

@Configuration
public static class CloudEventMessageConverterConfiguration {
/**
* Configure a RecordMessageConverter for Spring Kafka to pick up and use to
* convert to and from CloudEvent and Message.
*/
@Bean
public CloudEventRecordMessageConverter recordMessageConverter() {
return new CloudEventRecordMessageConverter();
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.examples.spring;

class Foo {

private String value;

public Foo() {
}

public Foo(String value) {
this.value = value;
}

public String getValue() {
return this.value;
}

public void setValue(String value) {
this.value = value;
}

@Override
public String toString() {
return "Foo [value=" + this.value + "]";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
Loading