Saturday 22 July 2023

Example of a simple Kafka producer and consumer using Spring Boot with the Spring Kafka library

 Sure! Below is an example of a simple Kafka producer and consumer using Spring Boot with the Spring Kafka library.


1. First, make sure you have the necessary dependencies in your `pom.xml` (Maven) or `build.gradle` (Gradle) file:


Maven:

```xml

<dependencies>

    <dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-web</artifactId>

    </dependency>


    <dependency>

        <groupId>org.springframework.kafka</groupId>

        <artifactId>spring-kafka</artifactId>

    </dependency>

</dependencies>

```


Gradle:

```gradle

dependencies {

    implementation 'org.springframework.boot:spring-boot-starter-web'

    implementation 'org.springframework.kafka:spring-kafka'

}

```


2. Create a Kafka configuration class to define the necessary Kafka properties:


```java

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.core.ProducerFactory;


import java.util.HashMap;

import java.util.Map;


@Configuration

public class KafkaConfig {

    private static final String KAFKA_BROKER = "localhost:9092";

    private static final String TOPIC = "my-topic";


    @Bean

    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);

        props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);

        props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);

        return props;

    }


    @Bean

    public ProducerFactory<String, String> producerFactory() {

        return new DefaultKafkaProducerFactory<>(producerConfigs());

    }


    @Bean

    public KafkaTemplate<String, String> kafkaTemplate() {

        return new KafkaTemplate<>(producerFactory());

    }


    @Bean

    public String topic() {

        return TOPIC;

    }

}

```


3. Create a Kafka producer to send messages:


```java

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Service;


@Service

public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    private final String topic;


    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate, String topic) {

        this.kafkaTemplate = kafkaTemplate;

        this.topic = topic;

    }


    public void sendMessage(String message) {

        kafkaTemplate.send(topic, message);

    }

}

```


4. Create a Kafka consumer to receive messages:


```java

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;


@Component

public class KafkaConsumerService {


    @KafkaListener(topics = "my-topic", groupId = "my-group")

    public void receiveMessage(String message) {

        System.out.println("Received message: " + message);

    }

}

```


5. Run the Spring Boot application:


```java

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication

public class KafkaApplication {

    public static void main(String[] args) {

        SpringApplication.run(KafkaApplication.class, args);

    }

}

```


Now, when you run the Spring Boot application, it will act as both a Kafka producer and consumer. When you send a message using the `KafkaProducerService.sendMessage()` method, the Kafka consumer (`KafkaConsumerService.receiveMessage()`) will receive and print the message.


Make sure you have Kafka running locally on the default port (9092) before running the Spring Boot application.


Please note that this example uses a simple setup with a single topic and group ID. In a real-world scenario, you may need more advanced configurations and error handling based on your use case.

No comments:

Post a Comment