| by Arround The Web | No comments

Apache Kafka Streams Map Transformations

In Kafka Streams, transformations refer to operations that we can perform on the data that is stored in a Kafka stream. The transformations allow us to modify the existing data and store the results in a new stream, print it to the console, or re-write it back to the original index.

Transformations are some of the most widespread use of the Kafka Streams when working in Apache Kafka. This is because they allow you to process and analyze the data in real-time.

Kafka offers a wide range of transformations which allow you to perform a wide array of operations on Kafka Streams data.

One such transformation is a map transformation. This type of transformation allows you to apply a user-defined function to each record in the stream and return a new record with the transformed data.

In this tutorial, we will explore how the map transformation works, the types of map transformations in Kafka Streams, and the Kafka Streams DSL API to learn how to use it.

Types of Kafka Streams Map Transformations

The following are some common types of map transformation when working with Kafka Streams:

Map – As mentioned, the map transformation allows us to apply a given function to each record in an input stream and return a new record with the transformed data.

flatMap – The second type of Kafka map transformation is a flatMap transform. This type allows us to apply a function to each record in an input stream and return a zero or more records with the transformed data.

mapValues – The mapValues transformation allows us to apply a given function to each record in the stream and produce a new record with the same key and the transformed value.

flatMapValues – Similarly, this function applies a defined function to the value of each record in the stream and returns zero or more records with the same key as the input key and the transformed value.

Filter – Finally, we have the filter map transformation. As the name suggests, this transformation allows us to apply a predicate function to each record in the stream and return a new stream with the records that match the condition that is defined in the predicate function. The function takes a record and returns a Boolean value based on a given condition.

In this post, we will focus on the map transformation in Kafka. But first, we have tutorials which describe all the other transformations that are previously mentioned. Check them out to learn more.

Kafka Map Method

The following shows the definition of the map method in the Kafka Streams DSL API:

<K, V, KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);

The method takes a KeyValueMapper function as the input and applies it to each record in the stream. It then returns a new stream with the transformed records as defined in the function.

The following are the accepted parameters for the map method:

  1. K refers to the key type of the input stream.
  2. V denotes the value type of the input stream.
  3. KR refers to the key type of the output stream.
  4. VR refers to the value type of the output stream.
  5. KeyValueMapper denotes the functional interface that takes a key-value pair from the input stream.

Example Demonstration:

The following example demonstrates how we can use the map method to transform an input stream with integer values, convert them to strings, and output the values to the console:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class KafkaStreamsMapExample {
public static void main(String[] args) {

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-map");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());

        StreamsBuilder builder = new StreamsBuilder();
   
        KStream inputStream = builder.stream("int-topic");

        KStream outputStream = inputStream.map((key, value) -> KeyValue.pair(key, value.toString()));

        outputStream.foreach((key, value) -> System.out.println("key=" + key + ", value=" + value));
       
        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

In the provided example, we use the Kafka Streams API to read the data from the int-topic stream. We then use the map method to apply the toString method to each value in the topic. Finally, we use the forEach transformation to print the records to the console.

Conclusion

This tutorial explored the fundamentals of working with Kafka Streams map transformation. We also demonstrated how we could use it to convert a stream of integers to string values.

Share Button

Source: linuxhint.com

Leave a Reply