Java and KSQLDB — KTable demo

Prateek
2 min readApr 6, 2023

--

Create Topic

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic COUNTRY-CSV

Create Table

ksql> CREATE TABLE COUNTRYTABLE  (countrycode VARCHAR PRIMARY KEY, countryname VARCHAR) WITH (KAFKA_TOPIC='COUNTRY-CSV', VALUE_FORMAT='DELIMITED');

Message
---------------
Table created
---------------

ksql> show tables;

Table Name | Kafka Topic | Key Format | Value Format | Windowed
-------------------------------------------------------------------
COUNTRYTABLE | COUNTRY-CSV | KAFKA | DELIMITED | false
-------------------------------------------------------------------

ksql> describe COUNTRYTABLE;

Name : COUNTRYTABLE
Field | Type
----------------------------------------------
COUNTRYCODE | VARCHAR(STRING) (primary key)
COUNTRYNAME | VARCHAR(STRING)
----------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;


ksql> describe COUNTRYTABLE extended;

Name : COUNTRYTABLE
Type : TABLE
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : DELIMITED
Kafka topic : COUNTRY-CSV (partitions: 1, replication: 1)
Statement : CREATE TABLE COUNTRYTABLE (COUNTRYCODE STRING PRIMARY KEY, COUNTRYNAME STRING) WITH (KAFKA_TOPIC='COUNTRY-CSV', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');

Field | Type
----------------------------------------------
COUNTRYCODE | VARCHAR(STRING) (primary key)
COUNTRYNAME | VARCHAR(STRING)
----------------------------------------------

Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic COUNTRY-CSV)
ksql>

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>ksql-java-learnings</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<!-- Keep versions as properties to allow easy modification -->
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<ksqldb.version>0.28.2</ksqldb.version>
</properties>

<dependencies>

<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-udf</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.7</version>
<type>pom</type>
<scope>test</scope>
</dependency>
</dependencies>

<repositories>
<repository>
<id>confluent</id>
<name>confluent-repo</name>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>

CountryData.java

package com.example;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class CountryData {
private String countryCode;
private String countryName;
}

MainApp.java — run the main App

package com.example;

import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.StreamedQueryResult;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;

public class ExampleApp {

public static String KSQLDB_SERVER_HOST = "0.0.0.0";
public static int KSQLDB_SERVER_HOST_PORT = 8088;

public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> properties = Collections.singletonMap(
"auto.offset.reset", "earliest"
);

ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_HOST_PORT)
.setUseTls(false)
.setUseAlpn(true);
Client client = Client.create(options);

System.out.println("-------------------------------------------");
String sql = "select countrycode, countryname from countrytable emit changes;";

StreamedQueryResult sqr = client.streamQuery(sql, properties).get();

for (int i = 0; i < 10; i++) {
// Block until a new row is available
Row r = sqr.poll();
if (r != null) {
CountryData countryData = CountryData.builder()
.countryCode(r.getString("COUNTRYCODE"))
.countryName(r.getString("COUNTRYNAME"))
.build();
System.out.println("## Row: " + countryData);
} else {
System.out.println("Query has ended.");
}
}
}
}

Select case in KSQL

ksql> select countrycode, countryname from countrytable emit changes;
+----------------------------------------------------------------+----------------------------------------------------------------+
|COUNTRYCODE |COUNTRYNAME |
+----------------------------------------------------------------+----------------------------------------------------------------+
|AU |Australia |
|IN |India |
|GB |UK |
|US |USA |

Produce some data —

afka-console-producer --bootstrap-server localhost:9092 --topic COUNTRY-CSV --property "parse.key=true"  --property "key.separator=:"
>AU:Australia
>IN:India
>GB:UK
>US:USA
>

--

--

Prateek
Prateek

Written by Prateek

Java Developer and enthusiast

No responses yet