Kafka Streams is a client
library, which streams data from one Kafka topic to another.
Upstash Kafka Setup
Create a Kafka cluster using Upstash Console or
Upstash CLI by following
Getting Started.
Create two topics by following the creating topic
steps. Let’s name first topic
“input”, since we are going to stream this topic to other one, which we can name
it as “output”.
Project Setup
Install Maven to your machine by following Maven Installation Guide.
Run mvn –version
in a terminal or in a command prompt to make sure you have
Maven downloaded.
It should print out the version of the Maven you have:
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: D:\apache-maven-3.6.3\apache-maven\bin\..
Java version: 1.8.0_232, vendor: AdoptOpenJDK, runtime: C:\Program Files\AdoptOpenJDK\jdk-8.0.232.09-hotspot\jre
Default locale: en_US, platform encoding: Cp1250
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
To create the Maven project;
Go into the folder that you want to create the project in your terminal or
command prompt by running cd <folder path>
Run the following command:
mvn archetype:generate -DgroupId=com.kafkastreamsinteg.app -DartifactId=kafkastreamsinteg-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
Add Kafka Streams into the Project
Open the project folder by using an IDE which has maven plugin such as Intellij,
Visual Studio, Eclipse etc. Add following dependencies into the dependencies tag
in pom.xml
file.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>2.0.3</version>
</dependency>
Streaming From One Topic to Another Topic
Import the following packages first:
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Pattern;
Define the names of the topics you are going to work on:
String inputTopic = "input";
String outputTopic = "output";
Create the following properties for Kafka Streams and replace UPSTASH-KAFKA-*
placeholders with your cluster information.
final Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "UPSTASH-KAFKA-ENDPOINT:9092");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"myLastNewProject");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule username=\"UPSTASH-KAFKA-USERNAME\" password=\"UPSTASH-KAFKA-PASSWORD\";");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.topicPrefix(TopicConfig.RETENTION_MS_CONFIG), 604800000);
props.put(StreamsConfig.topicPrefix(TopicConfig.CLEANUP_POLICY_CONFIG), TopicConfig.CLEANUP_POLICY_DELETE);
props.put(StreamsConfig.topicPrefix(TopicConfig.RETENTION_BYTES_CONFIG), 268435456);
Start the builder for streaming and assign input topic as the source:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(inputTopic);
Apply the following steps to count the words in the sentence sent to input topic
and stream the results to the output topic:
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("countMapping");
materialized.withLoggingDisabled();
source.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word, Grouped.as("groupMapping"))
.count(materialized).toStream().mapValues(Object::toString)
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
Since “groupby” function causing repartition and creation of a new internal
topic to store the groups intermediately, be sure that there is enough partition
capacity on your Upstash Kafka. For detailed information about the max partition
capacity of Kafka cluster, check plans.
Just to be sure, you can check from topic section on
console if the internal repartition topic created
successfully when you run your application and send data to input topic. For
reference, naming convention for internal repartition topics:
<application id><group name><repartition>
Next, finalize and build the streams builder. Create a topology of your process.
It can be viewed by printing.
final Topology topology = builder.build();
System.out.println(topology.describe());
Here is the example topology in this scenario:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
--> groupMapping
<-- KSTREAM-SOURCE-0000000000
Processor: groupMapping (stores: [])
--> groupMapping-repartition-filter
<-- KSTREAM-FLATMAPVALUES-0000000001
Processor: groupMapping-repartition-filter (stores: [])
--> groupMapping-repartition-sink
<-- groupMapping
Sink: groupMapping-repartition-sink (topic: groupMapping-repartition)
<-- groupMapping-repartition-filter
Sub-topology: 1
Source: groupMapping-repartition-source (topics: [groupMapping-repartition])
--> KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-AGGREGATE-0000000003 (stores: [countMapping])
--> KTABLE-TOSTREAM-0000000007
<-- groupMapping-repartition-source
Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
--> KSTREAM-MAPVALUES-0000000008
<-- KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-MAPVALUES-0000000008 (stores: [])
--> KSTREAM-SINK-0000000009
<-- KTABLE-TOSTREAM-0000000007
Sink: KSTREAM-SINK-0000000009 (topic: output)
<-- KSTREAM-MAPVALUES-0000000008
Finally, start the Kafka Streams that was built and run it.
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
try {
streams.start();
System.out.println("streams started");
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
Runtime.getRuntime().addShutdownHook(new Thread("streams-word-count") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});