Sure! Below is an example of a simple Kafka producer and consumer using Spring Boot with the Spring Kafka library.
1. First, make sure you have the necessary dependencies in your `pom.xml` (Maven) or `build.gradle` (Gradle) file:
Maven:
```xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
```
Gradle:
```gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
}
```
2. Create a Kafka configuration class to define the necessary Kafka properties:
```java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
private static final String KAFKA_BROKER = "localhost:9092";
private static final String TOPIC = "my-topic";
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);
props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public String topic() {
return TOPIC;
}
}
```
3. Create a Kafka producer to send messages:
```java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final String topic;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate, String topic) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
public void sendMessage(String message) {
kafkaTemplate.send(topic, message);
}
}
```
4. Create a Kafka consumer to receive messages:
```java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
```
5. Run the Spring Boot application:
```java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
```
Now, when you run the Spring Boot application, it will act as both a Kafka producer and consumer. When you send a message using the `KafkaProducerService.sendMessage()` method, the Kafka consumer (`KafkaConsumerService.receiveMessage()`) will receive and print the message.
Make sure you have Kafka running locally on the default port (9092) before running the Spring Boot application.
Please note that this example uses a simple setup with a single topic and group ID. In a real-world scenario, you may need more advanced configurations and error handling based on your use case.
No comments:
Post a Comment