In this example, we’ll see how to create Json stream and get the data.
Create Topic
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic USERPROFILE
Created topic USERPROFILE.
Create Stream
ksql> CREATE STREAM userprofile (userid INT, firstname VARCHAR, lastname VARCHAR, countrycode VARCHAR, rating DOUBLE) WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'USERPROFILE');
Message
----------------
Stream created
----------------
ksql> list streams;
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
------------------------------------------------------------------------------------------
KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false
USERPROFILE | USERPROFILE | KAFKA | JSON | false
USERS_STREAM | USERS | KAFKA | DELIMITED | false
------------------------------------------------------------------------------------------
ksql>
ksql> describe USERPROFILE;
Name : USERPROFILE
Field | Type
-------------------------------
USERID | INTEGER
FIRSTNAME | VARCHAR(STRING)
LASTNAME | VARCHAR(STRING)
COUNTRYCODE | VARCHAR(STRING)
RATING | DOUBLE
-------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
ksql> describe USERPROFILE extended;
Name : USERPROFILE
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : USERPROFILE (partitions: 1, replication: 1)
Statement : CREATE STREAM USERPROFILE (USERID INTEGER, FIRSTNAME STRING, LASTNAME STRING, COUNTRYCODE STRING, RATING DOUBLE) WITH (KAFKA_TOPIC='USERPROFILE', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');
Field | Type
-------------------------------
USERID | INTEGER
FIRSTNAME | VARCHAR(STRING)
LASTNAME | VARCHAR(STRING)
COUNTRYCODE | VARCHAR(STRING)
RATING | DOUBLE
-------------------------------
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic USERPROFILE)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ksql-java-learnings</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<!-- Keep versions as properties to allow easy modification -->
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<ksqldb.version>0.28.2</ksqldb.version>
</properties>
<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-udf</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.7</version>
<type>pom</type>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<name>confluent-repo</name>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>
UserProfile.java
package com.example;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class UserProfile {
private String firstName;
private String lastName;
private String countryCode;
private Double rating;
}
MainApp.java
package com.example;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.StreamedQueryResult;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class ExampleApp {
public static String KSQLDB_SERVER_HOST = "0.0.0.0";
public static int KSQLDB_SERVER_HOST_PORT = 8088;
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> properties = Collections.singletonMap(
"auto.offset.reset", "earliest"
);
ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_HOST_PORT)
.setUseTls(false)
.setUseAlpn(true);
Client client = Client.create(options);
String sql = "select firstname, lastname, countrycode, rating from USERPROFILE emit changes;";
StreamedQueryResult streamedQueryResult = client.streamQuery(sql, properties).get();
for (int i = 0; i < 10; i++) {
// Block until a new row is available
Row row = streamedQueryResult.poll();
if (row != null) {
UserProfile userProfile = UserProfile.builder()
.firstName(row.getString("FIRSTNAME"))
.lastName(row.getString("LASTNAME"))
.countryCode(row.getString("COUNTRYCODE"))
.rating(row.getDouble("RATING"))
.build();
System.out.println("Row: " + userProfile.toString());
} else {
System.out.println("Query has ended.");
}
}
}
}
Push some data through console-producer
kafka-console-producer --bootstrap-server localhost:9092 --topic USERPROFILE
>{"userid": 1001, "firstname":"Bob", "lastname":"Smith", "countrycode":"US", "rating":4.2}
>{"userid": 1000, "firstname":"Alison", "lastname":"Smith", "countrycode":"GB", "rating":4.7}
>
See Data on KSQL console
ksql> select firstname, lastname, countrycode, rating from USERPROFILE emit changes;
+-------------------------------+-------------------------------+-------------------------------+-------------------------------+
|FIRSTNAME |LASTNAME |COUNTRYCODE |RATING |
+-------------------------------+-------------------------------+-------------------------------+-------------------------------+
|Bob |Smith |US |4.2 |
|Alison |Smith |GB |4.7 |