I’ll further enhance this to use scenario from here: https://prateek-ashtikar512.medium.com/java-ksqldb-create-stream-with-json-7a22ccbecafa
Link for reference — https://docs.ksqldb.io/en/latest/reference/#scalar-functions
I hope you’ve already created USERPROFILE topic and users_stream streams
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic USERPROFILE
Created topic USERPROFILE.
kafka-console-producer --bootstrap-server localhost:9092 --topic USERPROFILE
>{"userid": 1001, "firstname":"Bob", "lastname":"Smith", "countrycode":"US", "rating":4.2}
>{"userid": 1000, "firstname":"Alison", "lastname":"Smith", "countrycode":"GB", "rating":4.7}
>
ksql> create stream users_stream (name VARCHAR, countrycode VARCHAR) WITH (KAFKA_TOPIC='USERS', VALUE_FORMAT='DELIMITED');
Message
----------------
Stream created
----------------
UserData.java
package com.example;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class UserData {
private String createTime;
private String fullName;
}
MainApp.java
package com.example;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.StreamedQueryResult;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class ExampleApp {
public static String KSQLDB_SERVER_HOST = "0.0.0.0";
public static int KSQLDB_SERVER_HOST_PORT = 8088;
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> properties = Collections.singletonMap(
"auto.offset.reset", "earliest"
);
ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_HOST_PORT)
.setUseTls(false)
.setUseAlpn(true);
Client client = Client.create(options);
String sql = "select TIMESTAMPTOSTRING(rowtime, 'dd/MMM HH:mm') as createtime, firstname + ' ' + ucase(lastname) as full_name from userprofile emit changes;";
StreamedQueryResult streamedQueryResult = client.streamQuery(sql, properties).get();
for (int i = 0; i < 10; i++) {
// Block until a new row is available
Row row = streamedQueryResult.poll();
if (row != null) {
UserData userData = UserData.builder()
.createTime(row.getString("CREATETIME"))
.fullName(row.getString("FULL_NAME"))
.build();
System.out.println("Row: " + userData.toString());
} else {
System.out.println("Query has ended.");
}
}
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ksql-java-learnings</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<!-- Keep versions as properties to allow easy modification -->
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<ksqldb.version>0.28.2</ksqldb.version>
</properties>
<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-udf</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.7</version>
<type>pom</type>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<name>confluent-repo</name>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>
ksql> select TIMESTAMPTOSTRING(rowtime, 'dd/MMM HH:mm') as createtime, firstname + ' ' + ucase(lastname) as full_name from userprofile emit changes;
+-----------------------------------------------------------+-----------------------------------------------------------+
|CREATETIME |FULL_NAME |
+-----------------------------------------------------------+-----------------------------------------------------------+
|14/Sep 12:12 |Alison SMITH |
|14/Sep 12:13 |Bob SMITH |
— — — — — — — — — — — — — — — — — — — —
Streams from streams and functions
With the above existing setup, you can simply run the below main App and see the difference.
select firstname + ' '
+ ucase( lastname)
+ ' from ' + countrycode
+ ' has a rating of ' + cast(rating as varchar) + ' stars. '
+ case when rating < 2.5 then 'Poor'
when rating between 2.5 and 4.2 then 'Good'
else 'Excellent'
end as description
from userprofile emit changes;
MainApp.java
package com.example;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.StreamedQueryResult;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class ExampleApp {
public static String KSQLDB_SERVER_HOST = "0.0.0.0";
public static int KSQLDB_SERVER_HOST_PORT = 8088;
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> properties = Collections.singletonMap(
"auto.offset.reset", "earliest"
);
ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_HOST_PORT)
.setUseTls(false)
.setUseAlpn(true);
Client client = Client.create(options);
System.out.println("-------------------------------------------");
String streamFromStreamAndFunction = "select firstname + ' ' " +
"+ ucase( lastname) " +
"+ ' from ' + countrycode " +
"+ ' has a rating of ' + cast(rating as varchar) + ' stars. ' " +
"+ case when rating < 2.5 then 'Poor'" +
" when rating between 2.5 and 4.2 then 'Good'" +
" else 'Excellent' " +
" end as description " +
"from userprofile emit changes;";
StreamedQueryResult sqr = client.streamQuery(streamFromStreamAndFunction, properties).get();
for (int i = 0; i < 10; i++) {
// Block until a new row is available
Row r = sqr.poll();
if (r != null) {
System.out.println("## Row: " + r.getString("DESCRIPTION"));
} else {
System.out.println("Query has ended.");
}
}
}
}