Saturday 22 July 2023

Example of a simple Kafka producer and consumer using Spring Boot with the Spring Kafka library

 Sure! Below is an example of a simple Kafka producer and consumer using Spring Boot with the Spring Kafka library.


1. First, make sure you have the necessary dependencies in your `pom.xml` (Maven) or `build.gradle` (Gradle) file:


Maven:

```xml

<dependencies>

    <dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-web</artifactId>

    </dependency>


    <dependency>

        <groupId>org.springframework.kafka</groupId>

        <artifactId>spring-kafka</artifactId>

    </dependency>

</dependencies>

```


Gradle:

```gradle

dependencies {

    implementation 'org.springframework.boot:spring-boot-starter-web'

    implementation 'org.springframework.kafka:spring-kafka'

}

```


2. Create a Kafka configuration class to define the necessary Kafka properties:


```java

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.core.ProducerFactory;


import java.util.HashMap;

import java.util.Map;


@Configuration

public class KafkaConfig {

    private static final String KAFKA_BROKER = "localhost:9092";

    private static final String TOPIC = "my-topic";


    @Bean

    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);

        props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);

        props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);

        return props;

    }


    @Bean

    public ProducerFactory<String, String> producerFactory() {

        return new DefaultKafkaProducerFactory<>(producerConfigs());

    }


    @Bean

    public KafkaTemplate<String, String> kafkaTemplate() {

        return new KafkaTemplate<>(producerFactory());

    }


    @Bean

    public String topic() {

        return TOPIC;

    }

}

```


3. Create a Kafka producer to send messages:


```java

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Service;


@Service

public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    private final String topic;


    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate, String topic) {

        this.kafkaTemplate = kafkaTemplate;

        this.topic = topic;

    }


    public void sendMessage(String message) {

        kafkaTemplate.send(topic, message);

    }

}

```


4. Create a Kafka consumer to receive messages:


```java

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;


@Component

public class KafkaConsumerService {


    @KafkaListener(topics = "my-topic", groupId = "my-group")

    public void receiveMessage(String message) {

        System.out.println("Received message: " + message);

    }

}

```


5. Run the Spring Boot application:


```java

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication

public class KafkaApplication {

    public static void main(String[] args) {

        SpringApplication.run(KafkaApplication.class, args);

    }

}

```


Now, when you run the Spring Boot application, it will act as both a Kafka producer and consumer. When you send a message using the `KafkaProducerService.sendMessage()` method, the Kafka consumer (`KafkaConsumerService.receiveMessage()`) will receive and print the message.


Make sure you have Kafka running locally on the default port (9092) before running the Spring Boot application.


Please note that this example uses a simple setup with a single topic and group ID. In a real-world scenario, you may need more advanced configurations and error handling based on your use case.

Common configuration details you might need to set when configuring a Kafka client

 To configure a Kafka client, you will need to provide the necessary settings to connect to the Kafka broker(s) and specify additional properties based on your application requirements. The exact configuration details will depend on the programming language and Kafka client library you are using.


Here are the common configuration details you might need to set when configuring a Kafka client:


1. **Bootstrap Servers:**

   The list of Kafka brokers that the client can initially connect to. The client will use this list to discover the full set of available brokers in the Kafka cluster. The format is a comma-separated list of broker addresses in the form `hostname:port`. For example, `broker1:9092,broker2:9092,broker3:9092`.


2. **Topic Name:**

   The name of the Kafka topic to which you want to produce or consume messages.


3. **Client ID:**

   A unique identifier for the Kafka client. It helps Kafka track client activity and can be useful for monitoring and debugging.


4. **Producer/Consumer Configuration:**

   Depending on whether you are building a Kafka producer or consumer, you will need to set specific configurations related to producing or consuming messages. For example, the producer configuration may include settings for the message key and value serializers, compression, batch size, etc. The consumer configuration may include settings for the group ID, offset behavior, etc.


5. **Security Settings (Optional):**

   If your Kafka cluster is secured, you might need to provide security-related settings such as SSL certificates, username/password, or authentication tokens.


6. **Other Application-Specific Settings:**

   Depending on your application's requirements, you might need to set other properties like the message key, message value, partitioning strategy, etc.


Here's an example of how you might configure a Kafka producer in Java using the Apache Kafka client library (KafkaProducer API):


```java

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


public class KafkaProducerExample {

    public static void main(String[] args) {

        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092,broker3:9092");

        props.put(ProducerConfig.CLIENT_ID_CONFIG, "my-kafka-producer");

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");


        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        String topicName = "my-topic";


        try {

            for (int i = 0; i < 10; i++) {

                String message = "Message " + i;

                ProducerRecord<String, String> record = new ProducerRecord<>(topicName, message);

                producer.send(record);

                System.out.println("Sent: " + message);

            }

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            producer.close();

        }

    }

}

```


This example configures a Kafka producer with the required settings, such as `BOOTSTRAP_SERVERS_CONFIG`, `KEY_SERIALIZER_CLASS_CONFIG`, and `VALUE_SERIALIZER_CLASS_CONFIG`. It then sends ten messages to the topic named "my-topic."


The configuration details and properties may vary depending on the Kafka client library and language you are using. Always refer to the official documentation of the Kafka client library you are using for detailed information on configuration options and settings.

To run Kafka in the background on Ubuntu, you can use the following steps:

 To run Kafka in the background on Ubuntu, you can use the following steps:


1. Open a terminal.


2. Navigate to the Kafka installation directory. For example, if you have installed Kafka in the `/opt/kafka` directory, use the following command:


```bash

cd /opt/kafka

```


3. Start ZooKeeper in the background:


```bash

nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &

```


The `nohup` command is used to run a command in the background and keep it running even if you close the terminal.


4. Start Kafka in the background:


```bash

nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &

```


This command starts Kafka in the background, and any output or error messages are redirected to `kafka.log`.


Now, Kafka is running in the background as a daemon process. To verify that it's running, you can check the contents of the log files:


```bash

tail -f zookeeper.log

tail -f kafka.log

```


The `tail -f` command allows you to continuously monitor the log files for any new output.


If you want to stop Kafka and ZooKeeper, you can use the following commands:


```bash

bin/kafka-server-stop.sh

bin/zookeeper-server-stop.sh

```


These commands will stop Kafka and ZooKeeper gracefully.


With Kafka running in the background, you can now produce and consume messages from your topics as needed, and the Kafka server will continue to run even if you close the terminal session.


For example, you can use the following command to check if the Kafka broker is up and running:

kafkacat -b 18.215.201.15:9092 -L


Installing Kafka on Ubuntu involves several steps, including installing Java, downloading and setting up Kafka, and configuring it. Here's a step-by-step guide to installing Kafka on Ubuntu

 Installing Kafka on Ubuntu involves several steps, including installing Java, downloading and setting up Kafka, and configuring it. Here's a step-by-step guide to installing Kafka on Ubuntu:


Step 1: Install Java

Kafka requires Java to run. Ensure you have Java installed on your system by running the following command:


```bash

sudo apt update

sudo apt install default-jre

```


Step 2: Download Kafka

Go to the Apache Kafka website (https://kafka.apache.org/downloads) and download the latest stable release. At the time of writing, the latest version is 2.8.0. You can use wget to download Kafka directly to your server:


```bash

wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz

```


Step 3: Extract Kafka

Extract the downloaded Kafka archive:


```bash

tar -xzf kafka_2.13-2.8.0.tgz

```


Step 4: Move Kafka Directory

Move the extracted Kafka directory to a location of your choice (e.g., /opt):


```bash

sudo mv kafka_2.13-2.8.0 /opt/kafka

```


Step 5: Configure Kafka Environment Variables (Optional)

You can set Kafka-related environment variables, such as KAFKA_HOME, by adding the following lines to your ~/.bashrc file:


```bash

export KAFKA_HOME=/opt/kafka

export PATH=$PATH:$KAFKA_HOME/bin

```


Run the following command to reload the bashrc file:


```bash

source ~/.bashrc

```


Step 6: Start ZooKeeper

Kafka uses ZooKeeper for distributed coordination. Start ZooKeeper using the following command:


```bash

cd /opt/kafka

bin/zookeeper-server-start.sh config/zookeeper.properties

```


Step 7: Start Kafka Server

Now, start the Kafka server:


```bash

bin/kafka-server-start.sh config/server.properties

```


Kafka should now be up and running on your Ubuntu system. By default, Kafka will run on port 9092 for broker communication and 2181 for ZooKeeper communication.


You can test Kafka by creating topics and producing/consuming messages. For example, to create a topic named "test-topic," use the following command:


```bash

bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

```


To produce a message to the "test-topic," run:


```bash

bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092

```


To consume messages from the "test-topic," run:


```bash

bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning

```


This completes the installation of Apache Kafka on Ubuntu, and you can now use it to publish and consume messages from different topics.