Create Topic
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic COUNTRY-CSV
Create Table
ksql> CREATE TABLE COUNTRYTABLE (countrycode VARCHAR PRIMARY KEY, countryname VARCHAR) WITH (KAFKA_TOPIC='COUNTRY-CSV', VALUE_FORMAT='DELIMITED');
Message
---------------
Table created
---------------
ksql> show tables;
Table Name | Kafka Topic | Key Format | Value Format | Windowed
-------------------------------------------------------------------
COUNTRYTABLE | COUNTRY-CSV | KAFKA | DELIMITED | false
-------------------------------------------------------------------
ksql> describe COUNTRYTABLE;
Name : COUNTRYTABLE
Field | Type
----------------------------------------------
COUNTRYCODE | VARCHAR(STRING) (primary key)
COUNTRYNAME | VARCHAR(STRING)
----------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
ksql> describe COUNTRYTABLE extended;
Name : COUNTRYTABLE
Type : TABLE
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : DELIMITED
Kafka topic : COUNTRY-CSV (partitions: 1, replication: 1)
Statement : CREATE TABLE COUNTRYTABLE (COUNTRYCODE STRING PRIMARY KEY, COUNTRYNAME STRING) WITH (KAFKA_TOPIC='COUNTRY-CSV', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');
Field | Type
----------------------------------------------
COUNTRYCODE | VARCHAR(STRING) (primary key)
COUNTRYNAME | VARCHAR(STRING)
----------------------------------------------
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic COUNTRY-CSV)
ksql>
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ksql-java-learnings</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<!-- Keep versions as properties to allow easy modification -->
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<ksqldb.version>0.28.2</ksqldb.version>
</properties>
<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-udf</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.7</version>
<type>pom</type>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<name>confluent-repo</name>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>
CountryData.java
package com.example;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class CountryData {
private String countryCode;
private String countryName;
}
MainApp.java — run the main App
package com.example;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.StreamedQueryResult;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class ExampleApp {
public static String KSQLDB_SERVER_HOST = "0.0.0.0";
public static int KSQLDB_SERVER_HOST_PORT = 8088;
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> properties = Collections.singletonMap(
"auto.offset.reset", "earliest"
);
ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_HOST_PORT)
.setUseTls(false)
.setUseAlpn(true);
Client client = Client.create(options);
System.out.println("-------------------------------------------");
String sql = "select countrycode, countryname from countrytable emit changes;";
StreamedQueryResult sqr = client.streamQuery(sql, properties).get();
for (int i = 0; i < 10; i++) {
// Block until a new row is available
Row r = sqr.poll();
if (r != null) {
CountryData countryData = CountryData.builder()
.countryCode(r.getString("COUNTRYCODE"))
.countryName(r.getString("COUNTRYNAME"))
.build();
System.out.println("## Row: " + countryData);
} else {
System.out.println("Query has ended.");
}
}
}
}
Select case in KSQL
ksql> select countrycode, countryname from countrytable emit changes;
+----------------------------------------------------------------+----------------------------------------------------------------+
|COUNTRYCODE |COUNTRYNAME |
+----------------------------------------------------------------+----------------------------------------------------------------+
|AU |Australia |
|IN |India |
|GB |UK |
|US |USA |
Produce some data —
afka-console-producer --bootstrap-server localhost:9092 --topic COUNTRY-CSV --property "parse.key=true" --property "key.separator=:"
>AU:Australia
>IN:India
>GB:UK
>US:USA
>