Java and KSQLDB example using Synchronous Usage

Prateek
3 min readApr 5, 2023

--

In this example, I will show you how to fetch data from KSQLDB using Java App using Synchronous Usage. I will use the Kafka topic and will create stream which will linked to this topic to get the source of data.

You can refer for more details: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/

Synchronous Usage

To consume records one-at-a-time in a synchronous fashion, use the poll() method on the query result object. If poll() is called with no arguments, it blocks until a new row becomes available or the query is terminated. You can also pass a Duration argument to poll(), which causes poll() to return null if no new rows are received by the time the duration has elapsed. For more information, see the API reference.

Please start the confluent kafka

confluent local services start
The local commands are intended for a single-node development environment only,
NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /var/folders/kn/4wr9__651l37hckxvnnwt4hh0000gn/T/confluent.677047
ZooKeeper is [UP]
Kafka is [UP]
Schema Registry is [UP]
Kafka REST is [UP]
Connect is [UP]
Starting ksqlDB Server
ksqlDB Server is [UP]
Starting Control Center
Control Center is [UP]
prateekashtikar@Prateeks-MacBook-Pro ksqldb %

Create Kafka topic

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic USERS
Created topic USERS.

Then connect to KSQL command line —

ksql                          
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= The Database purpose-built =
= for stream processing apps =
===========================================

Copyright 2017-2021 Confluent Inc.

CLI v7.0.1, Server v7.0.1 located at http://localhost:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> create stream users_stream (name VARCHAR, countrycode VARCHAR) WITH (KAFKA_TOPIC='USERS', VALUE_FORMAT='DELIMITED');

Message
----------------
Stream created
----------------

Java code

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ksql-java-learnings</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-udf</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.7</version>
<type>pom</type>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<name>confluent-repo</name>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>

Here is the main java code: Run the main app

package com.example;

import io.confluent.ksql.api.client.BatchedQueryResult;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ExecuteStatementResult;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.StreamedQueryResult;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ExampleApp {

public static String KSQLDB_SERVER_HOST = "0.0.0.0";
public static int KSQLDB_SERVER_HOST_PORT = 8088;

public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> properties = Collections.singletonMap(
"auto.offset.reset", "earliest"
);

ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_HOST_PORT)
.setUseTls(false)
.setUseAlpn(true);
Client client = Client.create(options);

String pullQuery = "select name, countrycode from USERS_STREAM emit changes;";

StreamedQueryResult streamedQueryResult = client.streamQuery(pullQuery, properties).get();

for (int i = 0; i < 10; i++) {
// Block until a new row is available
Row row = streamedQueryResult.poll();
if (row != null) {
System.out.println("Received a row!");
System.out.println("Row: " + row.values());
} else {
System.out.println("Query has ended.");
}
}
}
}

Push some data to above kafka topic

kafka-console-producer --bootstrap-server localhost:9092 --topic USERS
>Deepa,Ind
>Prateek,USA
>Bob,UK
>

You should be able to see data to program eclipse/sts/intellij console (whatever IDE you used), also you should be able to see the output of

ksql> select name, countrycode from USERS_STREAM emit changes;
+-----------------------------------------------------------------+-----------------------------------------------------------------+
|NAME |COUNTRYCODE |
+-----------------------------------------------------------------+-----------------------------------------------------------------+
|Deepa |Ind |
|Prateek |USA |
|Bob |UK |

Press CTRL-C to interrupt

--

--

Prateek
Prateek

Written by Prateek

Java Developer and enthusiast

No responses yet