| by Arround The Web | No comments

Apache Kafka Streams Branch Transformations

In Kafka Streams, a branch or substream refers to a subset of data which is created from a single source stream based on some condition. Branches allow us to split a single stream into multiple substreams, each representing a section of the original data and matching a specific condition.

For example, suppose there’s a stream which contains the product information. We can use the branches to split the stream into two substreams based on the price index of a given value. For example: if the product price is above or lower than a specified threshold.

As you can guess, Kafka Streams provide a specific transformation type that allows us to perform this exact operation.

Using this tutorial, you will learn the basics of working with Kafka Streams branch transformation to split a stream into multiple substreams.

What Is a Kafka Stream Branch Transformation?

As the name suggests, the branch transformation allows us to create multiple substreams from a single stream source based on a given condition. Each branch holds a subset of the original data that matches the defined parameters.

Kafka Streams Branch Method

We can use the branch() method to perform a branch transformation. The method definition is as follows:

public KStream<K, V>[] branch(Predicate<? super K, ? super V>... predicates)

The method takes a list of predicates as Java predicate objects to evaluate to true or false based on the provided condition.

The predicate interface is a function interface that takes a key of “type k” and a value of “type v” and outputs a Boolean value which indicates whether it’s a matching condition.

The return value of the branch method is an array of KStream<K, V> where each element in the array is a substream that contains the records that match the defined Predicate.

Once a new record is received in the source stream, it is evaluated against each predicate in the order which is defined in the branch method.

Example Application:

The following shows a simple Java application that sorts the products into two substreams based on the product price:

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.kstream.KStream;

import org.apache.kafka.streams.kstream.Produced;

public class ProductPriceSorter {

  public static void main(String[] args) {

    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, Product> productsStream = builder.stream("products");

    // Create two substreams based on the product price

    KStream<String, Product>[] priceBranches = productsStream.branch(

        (key, product) -> product.getPrice() > 500,

        (key, product) -> product.getPrice() <= 500

    );

    // Process the high-price products substream

    priceBranches[0].mapValues(product -> "High-price product: " + product.getName())

        .to("high-price-products", Produced.with(Serdes.String(), Serdes.String()));

    // Process the low-price products substream

        priceBranches[1].mapValues(product -> "Low-price product: " + product.getName())

        .to("low-price-products", Produced.with(Serdes.String(), Serdes.String()));

        KafkaStreams streams = new KafkaStreams(builder.build(), config);

        streams.start();

    }

}

In the provided example, we process each substream separately by mapping the product records to the strings that indicate whether the product is high-priced or low-priced. We then output the matching values either to the high-price-products or low-price-products.

Conclusion

We quickly explored how to work with Kafka Streams branch transformation to branch the data from a source stream into multiple substreams based on a condition.

Share Button

Source: linuxhint.com

Leave a Reply