In this example, I will show you how to fetch data from KSQLDB using Java App using Synchronous Usage. I will use the Kafka topic and will create stream which will linked to this topic to get the source of data.
You can refer for more details: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/
Synchronous Usage¶
To consume records one-at-a-time in a synchronous fashion, use the poll()
method on the query result object. If poll()
is called with no arguments, it blocks until a new row becomes available or the query is terminated. You can also pass a Duration
argument to poll()
, which causes poll()
to return null
if no new rows are received by the time the duration has elapsed. For more information, see the API reference.
Please start the confluent kafka
confluent local services start
The local commands are intended for a single-node development environment only,
NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /var/folders/kn/4wr9__651l37hckxvnnwt4hh0000gn/T/confluent.677047
ZooKeeper is [UP]
Kafka is [UP]
Schema Registry is [UP]
Kafka REST is [UP]
Connect is [UP]
Starting ksqlDB Server
ksqlDB Server is [UP]
Starting Control Center
Control Center is [UP]
prateekashtikar@Prateeks-MacBook-Pro ksqldb %
Create Kafka topic
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic USERS
Created topic USERS.
Then connect to KSQL command line —
ksql
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= The Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2021 Confluent Inc.
CLI v7.0.1, Server v7.0.1 located at http://localhost:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql> create stream users_stream (name VARCHAR, countrycode VARCHAR) WITH (KAFKA_TOPIC='USERS', VALUE_FORMAT='DELIMITED');
Message
----------------
Stream created
----------------
Java code
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ksql-java-learnings</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-udf</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.7</version>
<type>pom</type>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<name>confluent-repo</name>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>
Here is the main java code: Run the main app
package com.example;
import io.confluent.ksql.api.client.BatchedQueryResult;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ExecuteStatementResult;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.StreamedQueryResult;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ExampleApp {
public static String KSQLDB_SERVER_HOST = "0.0.0.0";
public static int KSQLDB_SERVER_HOST_PORT = 8088;
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> properties = Collections.singletonMap(
"auto.offset.reset", "earliest"
);
ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_HOST_PORT)
.setUseTls(false)
.setUseAlpn(true);
Client client = Client.create(options);
String pullQuery = "select name, countrycode from USERS_STREAM emit changes;";
StreamedQueryResult streamedQueryResult = client.streamQuery(pullQuery, properties).get();
for (int i = 0; i < 10; i++) {
// Block until a new row is available
Row row = streamedQueryResult.poll();
if (row != null) {
System.out.println("Received a row!");
System.out.println("Row: " + row.values());
} else {
System.out.println("Query has ended.");
}
}
}
}
Push some data to above kafka topic
kafka-console-producer --bootstrap-server localhost:9092 --topic USERS
>Deepa,Ind
>Prateek,USA
>Bob,UK
>
You should be able to see data to program eclipse/sts/intellij console (whatever IDE you used), also you should be able to see the output of
ksql> select name, countrycode from USERS_STREAM emit changes;
+-----------------------------------------------------------------+-----------------------------------------------------------------+
|NAME |COUNTRYCODE |
+-----------------------------------------------------------------+-----------------------------------------------------------------+
|Deepa |Ind |
|Prateek |USA |
|Bob |UK |
Press CTRL-C to interrupt