| by Arround The Web | No comments

Apache Kafka Streams Inverse Filter Transformations

Apache Kafka Streams provide us with various transformations that we can use to process and transform the data within a given source stream. One such transformation is the filter() transformation. This transformation allows us to remove certain records from a stream based on a given condition.

However, in some cases, we may want to perform the opposite of the filter transformation and include the values instead of removing them from a given stream. This is where an inverse filter transformation comes into play.

In this tutorial, we will learn how to implement an inverse filter transformation within a Kafka stream. This is because there is no built-in inverseFilter() transformation within Kafka when writing this tutorial.

What Is an Inverse Filter Transformation?

Let us start at the basics and explain an inverse filter transformation. We can define an inverse filter transformation as a Kafka stream transformation that returns a new stream which contains all the records from the input stream that do not match the given criteria.

Unlike a filter transform that includes the records that match a given condition, the inverse filter does the exact opposite and consists of the ones that do not fit.

Inverse Filter Transform in Kafka

As mentioned, Kafka does not provide a native API to implement an inverse filter operation. Instead, we can use the filterNot() method which is the opposite of filter() transform.

Using the filterNot() transformation removes the records that match a specific criteria, leaving you with the ones that do not match, also known as an inverse filter transformation.

Kafka Streams FilterNot() Method

The following shows the definition for the filterNot() method in Kafka Streams DSL:

public KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate)

The method takes a single argument, a Java predicate, to define the condition that is used in filtering the records from the stream. The predicate takes two arguments: the record key of type k and the record value of type v.

The method then returns a new KStream instance which represents the filtered stream. The new stream contains all the records from the original stream that do not match the condition as specified in the input predicate.

Example Application:

Consider the following example source code which demonstrates a basic Kafka Stream application that uses the filterNot() method to perform an inverse filter transformation:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

public class ProductPriceInverseFilterApplication {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-price-inverse-filter");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       
        StreamsBuilder builder = new StreamsBuilder();
       
        KStream productsStream = builder.stream("products");

        KStream inverseFilteredStream = productsStream.filterNot((productId, product) -> product.getPrice() >= 500);

        inverseFilteredStream.to("inverse-filtered-products", Produced.with(Serdes.String(), new ProductSerde()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

In the previous example, we use the filterNot() method to apply an inverse filter on the productsStream. We also pass a lambda expression to the filterNot() method to check that the product’s price is greater than or equal to 500. If true, the recorded is excluded from the inverse filtered stream.

This means that the resulting inverseFilteredStream holds only the records from the productsStream with a price that is less than 500 dollars. We also write the resulting stream to a new stream called “inverse-filtered-products”.

Conclusion

We explored an inverse filter transform in Apache Kafka Streams, how it works, and how we can use it to exclude the results that match a given condition. We hope you enjoyed this tutorial. Feel free to explore the other Kafka Transformation articles to learn more.

Share Button

Source: linuxhint.com

Leave a Reply