Skip to content

Kafka Producer & Consumer in Java

Sure, here’s a step-by-step tutorial on how to use Java and Gradle to consume and produce messages from Kafka:

Prerequisites

Before starting, you should have the following installed:

  • Java 8 or later
  • Gradle
  • Kafka

Step 1: Set up the project

First, create a new Gradle project and add the Kafka dependencies to your build.gradle file:

You can also add other dependencies as needed, such as logging frameworks or serialization libraries.

Step 2: Produce messages

To produce messages to a Kafka topic, you need to create a KafkaProducer instance and send ProducerRecords to it. Here’s an example:

In this example, we set up the producer configuration with the bootstrap servers and the key and value serializer classes. We then create a KafkaProducer instance and use it to send ten messages to the my-topic topic.

Step 3: Consume messages

To consume messages from a Kafka topic, you need to create a KafkaConsumer instance and subscribe to the topic. Here’s an example:

In this example, we set up the consumer configuration with the bootstrap servers, key and value deserializer classes, and the group ID. We then create a KafkaConsumer instance and subscribe to the my-topic topic. We use a while loop to keep polling for messages every second, and for each batch of messages received, we print out their key, value, partition, and offset. Outputting ten messages to the console, and the consumer outputting the same messages along with their metadata.

Step 4: Run the examples

To run the producer example, you can use the Gradle run task:

To run the consumer example, you can use the same run task with the consumer argument:

You should see the producer outputting ten messages to the console, and the consumer outputting the same messages along with their metadata.

Step 5: Configuring Kafka WITH PROPERTIES

In the examples above, we hard-coded some of the configuration parameters, such as the topic name, bootstrap servers, and group ID. In a real-world application, you might want to externalize these parameters to a configuration file or environment variables, so that you can easily modify them without changing the code.

Here’s an example of how to use a properties file to configure Kafka:

In this example, we define a kafka.properties file in the resources folder with the following contents:

We then load these properties into a Properties object and use them to configure the Kafka consumer instance. You can use the same approach to configure the Kafka producer or any other Kafka components.

Additional Configuration

In addition to the basic configuration shown in the examples above, there are many other properties you can set on the producer and consumer to fine-tune their behavior. Here are some examples:

Producer configuration

  • acks: Controls the number of acknowledgments the producer requires the broker to receive before considering a message as sent. Valid values are 0 for no acknowledgments, 1 for acknowledging the leader, and -1 (or all) for acknowledging all in-sync replicas. The default value is 1.
  • retries: Controls how many times the producer should retry sending a message in case of failures. The default value is 0.
  • max.in.flight.requests.per.connection: Controls how many messages can be sent to the broker without receiving a response. The default value is 5.
  • compression.type: Controls the compression codec to use for messages. Valid values are none, gzip, snappy, and lz4. The default value is none.
  • batch.size: Controls the maximum size of a batch of messages sent to the broker. The default value is 16384 bytes.

Consumer configuration

  • auto.offset.reset: Controls what happens when a new consumer group is created or when a consumer needs to reset its position in the topic. Valid values are earliest for starting from the beginning of the topic, latest for starting from the end of the topic, and none for throwing an exception if no offset is found. The default value is latest.
  • max.poll.records: Controls how many records the consumer will fetch in a single call to poll(). The default value is 500.
  • fetch.max.bytes: Controls the maximum size of data the broker will return in a single response. The default value is 52428800 bytes.
  • fetch.min.bytes: Controls the minimum size of data the broker will return before sending a response. The default value is 1 byte.
  • heartbeat.interval.ms: Controls how often the consumer will send heartbeats to the broker. The default value is 3000 milliseconds.