Java & KSQLDB — Create and manage new streams, tables, and persistent queries (executeStatement())

Prateek
4 min readApr 6, 2023

--

Starting with ksqlDB 0.11.0, the executeStatement() method enables client apps to:

  • Create new ksqlDB streams and tables
  • Drop existing ksqlDB streams and tables
  • Create new persistent queries, i.e., CREATE ... AS SELECT and INSERT INTO ... AS SELECT statements
  • Pause, Resume, and Terminate persistent queries

To use this method, pass in the SQL for the command to be executed. Query properties can be passed as an optional second argument. For more information, see the client API reference.

As explained in the Javadocs for the method above, the CompletableFuture returned by the executeStatement() method is completed as soon as the ksqlDB server has accepted the statement and a response is received by the client. In most situations, the ksqlDB server will have already executed the statement by this time, but this is not guaranteed.

For statements that create new persistent queries, the query ID may be retrieved from the returned ExecuteStatementResult, as long as the ksqlDB server version is at least 0.11.0, and the statement has executed by the time the server response was completed.

List streams, tables, topics, and queries

Starting with ksqlDB 0.11.0, the Java client for ksqlDB supports the following admin operations:

  • Listing ksqlDB streams, by using the listStreams() method
  • Listing ksqlDB tables, by using the listTables() method
  • Listing Kafka topics available for use with ksqlDB, by using the listTopics() method
  • Listing running ksqlDB queries, with the listQueries() method

Describe specific streams and tables

Starting with ksqlDB 0.12.0, the describeSource() method enables client apps to fetch metadata for existing ksqlDB streams and tables. The metadata returned from this method includes the stream or table's underlying topic name, column names and associated types, serialization formats, queries that read and write from the stream or table, and more. For more details, see the API reference.

Get metadata about the ksqlDB cluster

Starting with ksqlDB 0.16.0, the serverInfo() method enables client apps to fetch metadata about the ksqlDB cluster. The metadata returned from this method includes the version of ksqlDB the server is running, the Kafka cluster id and the ksqlDB service id. For more details, see the API reference.

Please make sure you’ve created topics

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic USERPROFILE
Created topic USERPROFILE.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>ksql-java-learnings</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<!-- Keep versions as properties to allow easy modification -->
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<ksqldb.version>0.28.2</ksqldb.version>
</properties>

<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-udf</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.7</version>
<type>pom</type>
<scope>test</scope>
</dependency>
</dependencies>

<repositories>
<repository>
<id>confluent</id>
<name>confluent-repo</name>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>

MainApp.java

package com.example;

import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.ServerInfo;
import io.confluent.ksql.api.client.SourceDescription;
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.TableInfo;
import io.confluent.ksql.api.client.TopicInfo;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

public class ExampleApp {

public static String KSQLDB_SERVER_HOST = "0.0.0.0";
public static int KSQLDB_SERVER_HOST_PORT = 8088;

public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> properties = Collections.singletonMap(
"auto.offset.reset", "earliest"
);

ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_HOST_PORT)
.setUseTls(false)
.setUseAlpn(true);
Client client = Client.create(options);

// Please make sure to create topic before we run this code
// kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic USERPROFILE

String sql = "CREATE STREAM userprofile (userid INT, firstname VARCHAR, lastname VARCHAR, countrycode VARCHAR, rating DOUBLE) " +
"WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'USERPROFILE');";

client.executeStatement(sql).get();

// Drop an existing ksqlDB table, assuming the table USERS exists:
client.executeStatement("DROP TABLE USERS;").get();

System.out.println("------------ List Streams -----------------------");
List<StreamInfo> streams = client.listStreams().get();
for (StreamInfo stream : streams) {
System.out.println(stream.getName()
+ " " + stream.getTopic()
+ " " + stream.getKeyFormat()
+ " " + stream.getValueFormat()
+ " " + stream.isWindowed()
);
}

System.out.println("------------ List Topics -----------------------");
List<TopicInfo> topics = client.listTopics().get();
for (TopicInfo topic : topics) {
System.out.println(topic.getName()
+ " " + topic.getPartitions()
+ " " + topic.getReplicasPerPartition()
);
}

System.out.println("------------ List Tables -----------------------");
List<TableInfo> tables = client.listTables().get();
for (TableInfo table : tables) {
System.out.println(table.getName()
+ " " + table.getTopic()
+ " " + table.getKeyFormat()
+ " " + table.getValueFormat()
+ " " + table.isWindowed()
);
}

System.out.println("------------ List Tables -----------------------");
List<QueryInfo> queries = client.listQueries().get();
for (QueryInfo query : queries) {
System.out.println(query.getQueryType() + " " + query.getId());
if (query.getQueryType() == QueryInfo.QueryType.PERSISTENT) {
System.out.println(query.getSink().get() + " " + query.getSinkTopic().get());
}
}

// Describe specific streams and tables
System.out.println("---- Describe specific streams and tables ----");
SourceDescription description = client.describeSource("USERPROFILE").get();
System.out.println("This source is a " + description.type());
System.out.println("This stream/table has " + description.fields().size() + " columns.");
System.out.println(description.writeQueries().size() + " queries write to this stream/table.");
System.out.println(description.readQueries().size() + " queries read from this stream/table.");


// Get metadata about the ksqlDB cluster
System.out.println("\n----Get metadata about the ksqlDB cluster ----");
ServerInfo serverInfo = client.serverInfo().get();
System.out.println("The ksqlDB version running on this server is " + serverInfo.getServerVersion());
System.out.println("The Kafka cluster this server is using is " + serverInfo.getKafkaClusterId());
System.out.println("The id of this ksqlDB service is " + serverInfo.getKsqlServiceId());
}
}

--

--