| by Arround The Web | No comments

Apache Kafka Streams Hopping Time Window Transformations

Hopping time window transformations in Kafka Streams are a type of windowed aggregation operation that allows us to process and aggregate the data over a fixed length and overlapping the time intervals known as windows.

To be more specific, hopping time windows slide over the stream at a fixed step size, and each window includes the data from the previous window as well as the current one which allows for continuous processing of the stream.

In a Kafka stream, we can define a hopping time window transformation using two main parameters:

  1. Window size
  2. Advance interval or hop

The window size parameter defines the length of the time that is covered by each window while the advance interval specifies how often the window moves forward. It is good to keep in mind that the advance interval can be smaller or larger than the defined window size. This depends on the overlap that is required between the windows.

For example, if we set the window size to five minutes and the advance interval to one minute, a new window is created every minute and each window covers the previous 5 minutes of the data plus the current minute.

This means that the windows will overlap by 4 minutes which allows for continuous processing of the stream.

Kafka Streams Hopping Time Window Transformation Example

The following example demonstrates how we can use the Kafka Streams API to perform a shopping time window transformation:

import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;

public class KafkaStreamsHoppingTimeWindowExample {

  public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hop-time-demo");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    StreamsBuilder builder = new StreamsBuilder();

    KStream input = builder.stream("users");
    KStream<Windowed, Long> windowedCounts = input
    KStream output = windowedCounts
        .map((key, value) -> new KeyValue(key.key(), value));
    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

In the provided example, we start by setting up the Kafka Streams properties including the bootstrap servers and default servers for keys and values.

We then create a Kafka Streams builder and use it to create a KStream from the “users” topic.

Next, we apply a hopping time window of 10 seconds with an advance interval of 5 seconds to the KStream using the windowedBy() method. This means that we emit a count of records for each window of 10 seconds, and the window will advance by 5 seconds at a time.

We then convert the windowed<String> key to a string for output, and write the output to a new topic using the to() method.

Finally, we create and start the Kafka Streams application, and add a shutdown hook to shut down the application gracefully when the JVM is shutting down.


We explored the basics of working with hopping time window transformations in Kafka Streams. Feel free to reference the document for more information on the Kafka Streams transformations API.

Share Button

Source: linuxhint.com

Leave a Reply