| by Arround The Web | No comments

Apache Kafka Streams Count Transformations

Kafka Streams is a lightweight and distributed stream processing framework for building real-time, fault-tolerant, and scalable applications. It is a part of the Apache Kafka ecosystem.

We can use the Kafka Streams platform to build the stream processing applications using a high-level DSL to process and analyze the data in real-time.

Properties of Kakfa Streams

The following are some points that you may need to understand about the Apache Kafka Streams platform:

Kafka Streams is essentially a library that is embedded in a Kafka broker. Its architecture is similar to a Kafka broker which means it runs on the JVM. It then uses Kafka as a storage layer.

The Kafka Streams platform provides the Processor API and the Stream DSL API. The processor API allows us the access to a low-level interface to create custom processors and connect them to Kafka topics. The Stream DSL API provides a high-level interface for processing the data such as filtering, joining, reducing, etc.

Another feature of Kafka Streams is Windowing support. Windowing allows us to process the data over a specific time interval. You can specify the windowing parameters based on the time interval or a set number of records.

Last but not the least is processing the semantics. Kafka Streams support the At-Least-Once and Exactly-Once processing. The At-Least-Once ensures that every record is processed at least once, while the Exactly-Once ensures that every record is processed only once.

In this tutorial, we will explore using one of Kafka Streams’ transformation features to process the stream data in a specific format.

Kafka Streams Count Transformation

As mentioned, Kafka Streams provides a set of transformations that we can apply to the data streams. One such transformation is the count transformation.

The count transformation allows us to count the records in a given stream. We can apply the transformation to a KStream or KTable object representing a stream or table.

The count transformation then generates a KTable object which contains the count of records for every key in the input stream/table.

Kafka Stream Count Method

We use the Count() method to call the count transformation on a stream/table. The method definition is as shown in the following:

KTable<K, Long> count();

The method accepts the K value, a KStream, or KTable object. It then returns the KTable with the count of records.

Example Application:

The following sample application demonstrates how to use the count transformation using Java:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;

import java.util.Properties;

public class CountUsersApp {

    public static void main(String[] args) {

        // set up the properties for the Kafka Streams application
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "count-users-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // create a new StreamsBuilder instance
        StreamsBuilder builder = new StreamsBuilder();

        // create a KStream from the "users" topic
        KStream usersStream = builder.stream("users");

        // group the stream by key and count the number of records for each key
        KTable countTable = usersStream
                .groupBy((key, value) -> key)

        // print the count to the console
        countTable.toStream().foreach((key, value) -> System.out.println("Count for key " + key + " is " + value));

        // create a new KafkaStreams instance using the properties and the StreamsBuilder
        KafkaStreams streams = new KafkaStreams(builder.build(), props);

        // start the KafkaStreams instance

        // add shutdown hook to close the KafkaStreams instance gracefully
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

In the provided example, we start by setting the properties of the Kafka stream app.

We then create a new StreamBuilderinstance. We then proceed to create a KStream from the “users” topic and group the stream by key using the groupBy method.

Using the count() method, we count the number of records for each key using the count() method.

Finally, we print the count to the console using the foreach transformation on the resulting KTable.


We explored the fundamentals of working with Kafka transformations by learning about the Kafka Streams count transformation.

Share Button

Source: linuxhint.com

Leave a Reply