In this example, I will show you how to fetch data from KSQLDB using Java App using single batch (executeQuery()). I will use the Kafka topic and will create stream which will linked to this topic to get the source of data.
You can refer for more details: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/
Receive query results in a single batch (executeQuery())
The executeQuery()
method enables client apps to receive query results as a single batch that's returned when the query completes.
This method is suitable for both pull queries and for terminating push queries, for example, queries that have a LIMIT
clause. For non-terminating push queries, use the streamQuery()
method instead.
Query properties can be passed as an optional second argument. For more information, see the client API reference.
By default, push queries return only newly arriving rows. To start from the beginning of the stream or table, set the auto.offset.reset
property to earliest
.
Please start the confluent kafka
confluent local services start
The local commands are intended for a single-node development environment only,
NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /var/folders/kn/4wr9__651l37hckxvnnwt4hh0000gn/T/confluent.677047
ZooKeeper is [UP]
Kafka is [UP]
Schema Registry is [UP]
Kafka REST is [UP]
Connect is [UP]
Starting ksqlDB Server
ksqlDB Server is [UP]
Starting Control Center
Control Center is [UP]
Create Kafka topic
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic USERS
Created topic USERS.
Then connect to KSQL command line —
ksql
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= The Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2021 Confluent Inc.CLI v7.0.1, Server v7.0.1 located at http://localhost:8088
Server Status: RUNNINGHaving trouble? Type 'help' (case-insensitive) for a rundown of how things work!ksql> create stream users_stream (name VARCHAR, countrycode VARCHAR) WITH (KAFKA_TOPIC='USERS', VALUE_FORMAT='DELIMITED'); Message
----------------
Stream created
----------------
Java code
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ksql-java-learnings</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-udf</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.7</version>
<type>pom</type>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<name>confluent-repo</name>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>
Here is the main java code: Run the main app
package com.example;
import io.confluent.ksql.api.client.BatchedQueryResult;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.Row;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class ExampleApp {
public static String KSQLDB_SERVER_HOST = "0.0.0.0";
public static int KSQLDB_SERVER_HOST_PORT = 8088;
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> properties = Collections.singletonMap(
"auto.offset.reset", "earliest"
);
ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_HOST_PORT)
.setUseTls(false)
.setUseAlpn(true);
Client client = Client.create(options);
String pullQuery = "select name, countrycode from USERS_STREAM emit changes LIMIT 4;";
BatchedQueryResult batchedQueryResult = client.executeQuery(pullQuery, properties);
List<Row> resultRows = batchedQueryResult.get();
System.out.println("Received results. Num rows: " + resultRows.size());
for (Row row : resultRows) {
System.out.println("Row: " + row.values());
}
}
}
Push some data to topic
kafka-console-producer --bootstrap-server localhost:9092 --topic USERS
>Deepa,Ind
>Prateek,USA
>Bob,UK
>Bill,PO
>
The query result from KSQL console
ksql> select name, countrycode from USERS_STREAM emit changes;
+-----------------------------------------------------------------+-----------------------------------------------------------------+
|NAME |COUNTRYCODE |
+-----------------------------------------------------------------+-----------------------------------------------------------------+
|Deepa |Ind |
|Prateek |USA |
|Bob |UK |
|Bill |PO |
Press CTRL-C to interrupt