Sure, here’s a step-by-step tutorial on how to use Java and Gradle to consume and produce messages from Kafka:
Prerequisites
Before starting, you should have the following installed:
- Java 8 or later
- Gradle
- Kafka
Step 1: Set up the project
First, create a new Gradle project and add the Kafka dependencies to your build.gradle
file:
1 2 3 | dependencies { implementation 'org.apache.kafka:kafka-clients:3.4.0' } |
You can also add other dependencies as needed, such as logging frameworks or serialization libraries.
Step 2: Produce messages
To produce messages to a Kafka topic, you need to create a KafkaProducer
instance and send ProducerRecord
s to it. Here’s an example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | import org.apache.kafka.clients.producer.*; import java.util.Properties; public class KafkaProducerExample { private static final String TOPIC_NAME = "my-topic"; private static final String BOOTSTRAP_SERVERS = "localhost:9092"; public static void main(String[] args) { // Set up the producer configuration Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Create the KafkaProducer instance KafkaProducer<String, String> producer = new KafkaProducer<>(props); // Send some messages for (int i = 0; i < 10; i++) { String key = "key-" + i; String value = "value-" + i; ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, key, value); producer.send(record); } // Close the producer producer.close(); } } |
In this example, we set up the producer configuration with the bootstrap servers and the key and value serializer classes. We then create a KafkaProducer
instance and use it to send ten messages to the my-topic
topic.
Step 3: Consume messages
To consume messages from a Kafka topic, you need to create a KafkaConsumer
instance and subscribe to the topic. Here’s an example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { private static final String TOPIC_NAME = "my-topic"; private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String GROUP_ID = "my-group"; public static void main(String[] args) { // Set up the consumer configuration Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); // Create the KafkaConsumer instance and subscribe to the topic KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(TOPIC_NAME)); // Start polling for messages while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n", record.key(), record.value(), record.partition(), record.offset()); } } } |
In this example, we set up the consumer configuration with the bootstrap servers, key and value deserializer classes, and the group ID. We then create a KafkaConsumer
instance and subscribe to the my-topic
topic. We use a while
loop to keep polling for messages every second, and for each batch of messages received, we print out their key, value, partition, and offset. Outputting ten messages to the console, and the consumer outputting the same messages along with their metadata.
Step 4: Run the examples
To run the producer example, you can use the Gradle run
task:
1 | $ gradle run --args='producer' |
To run the consumer example, you can use the same run
task with the consumer
argument:
1 | $ gradle run --args='consumer' |
You should see the producer outputting ten messages to the console, and the consumer outputting the same messages along with their metadata.
Step 5: Configuring Kafka WITH PROPERTIES
In the examples above, we hard-coded some of the configuration parameters, such as the topic name, bootstrap servers, and group ID. In a real-world application, you might want to externalize these parameters to a configuration file or environment variables, so that you can easily modify them without changing the code.
Here’s an example of how to use a properties file to configure Kafka:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { private static final String PROPERTIES_FILE = "kafka.properties"; public static void main(String[] args) throws IOException { // Load the properties from the file Properties props = new Properties(); try (InputStream input = KafkaConsumerExample.class.getClassLoader().getResourceAsStream(PROPERTIES_FILE)) { props.load(input); } // Create the KafkaConsumer instance and subscribe to the topic KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(props.getProperty("topic"))); // Start polling for messages while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n", record.key(), record.value(), record.partition(), record.offset()); } } } } |
In this example, we define a kafka.properties
file in the resources folder with the following contents:
1 2 3 4 5 | bootstrap.servers=localhost:9092 group.id=my-group key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer topic=my-topic |
We then load these properties into a Properties
object and use them to configure the Kafka consumer instance. You can use the same approach to configure the Kafka producer or any other Kafka components.
Additional Configuration
In addition to the basic configuration shown in the examples above, there are many other properties you can set on the producer and consumer to fine-tune their behavior. Here are some examples:
Producer configuration
acks
: Controls the number of acknowledgments the producer requires the broker to receive before considering a message as sent. Valid values are0
for no acknowledgments,1
for acknowledging the leader, and-1
(orall
) for acknowledging all in-sync replicas. The default value is1
.retries
: Controls how many times the producer should retry sending a message in case of failures. The default value is0
.max.in.flight.requests.per.connection
: Controls how many messages can be sent to the broker without receiving a response. The default value is5
.compression.type
: Controls the compression codec to use for messages. Valid values arenone
,gzip
,snappy
, andlz4
. The default value isnone
.batch.size
: Controls the maximum size of a batch of messages sent to the broker. The default value is16384
bytes.
Consumer configuration
auto.offset.reset
: Controls what happens when a new consumer group is created or when a consumer needs to reset its position in the topic. Valid values areearliest
for starting from the beginning of the topic,latest
for starting from the end of the topic, andnone
for throwing an exception if no offset is found. The default value islatest
.max.poll.records
: Controls how many records the consumer will fetch in a single call topoll()
. The default value is500
.fetch.max.bytes
: Controls the maximum size of data the broker will return in a single response. The default value is52428800
bytes.fetch.min.bytes
: Controls the minimum size of data the broker will return before sending a response. The default value is1
byte.heartbeat.interval.ms
: Controls how often the consumer will send heartbeats to the broker. The default value is3000
milliseconds.