In this example, we’ll learn how to insert a new row into a stream (insertInto()). Client apps can insert a new row of data into an existing ksqlDB stream by using the insertInto()
method. To insert multiple rows in a streaming fashion, see streamInserts()
instead.
Rows for insertion are represented as KsqlObject
instances. A KsqlObject
represents a map of strings (in this case, column names) to values (column values).
Example Usage —
Here’s an example of using the client to insert a new row into an existing stream ORDERS
with schema (ORDER_ID BIGINT, PRODUCT_ID VARCHAR, USER_ID VARCHAR)
.
Start the confluent kafka
confluent local services start
The local commands are intended for a single-node development environment only,
NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /var/folders/kn/4wr9__651l37hckxvnnwt4hh0000gn/T/confluent.866346
Starting ZooKeeper
ZooKeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting ksqlDB Server
ksqlDB Server is [UP]
Starting Control Center
Control Center is [UP]
Create Topic
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic USERS
Created topic USERS.
Create console producer
kafka-console-producer --bootstrap-server localhost:9092 --topic USERS
>
Create Stream —
ksql
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= The Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2021 Confluent Inc.
CLI v7.0.1, Server v7.0.1 located at http://localhost:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql> show streams;
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
------------------------------------------------------------------------------------------
KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false
------------------------------------------------------------------------------------------
ksql>
ksql>
ksql> show topics;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
default_ksql_processing_log | 1 | 1
---------------------------------------------------------------
ksql>
ksql> create stream users_stream (name VARCHAR, countrycode VARCHAR) WITH (KAFKA_TOPIC='USERS', VALUE_FORMAT='DELIMITED');
Message
----------------
Stream created
----------------
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ksql-java-learnings</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<!-- Keep versions as properties to allow easy modification -->
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<ksqldb.version>0.28.2</ksqldb.version>
</properties>
<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-udf</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.7</version>
<type>pom</type>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<name>confluent-repo</name>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>
Run the main app
package com.example;
import io.confluent.ksql.api.client.BatchedQueryResult;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.StreamedQueryResult;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class ExampleApp {
public static String KSQLDB_SERVER_HOST = "0.0.0.0";
public static int KSQLDB_SERVER_HOST_PORT = 8088;
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> properties = Collections.singletonMap(
"auto.offset.reset", "earliest"
);
ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_HOST_PORT)
.setUseTls(false)
.setUseAlpn(true);
Client client = Client.create(options);
KsqlObject row = new KsqlObject()
.put("name", "Arik")
.put("countrycode", "SL");
client.insertInto("USERS_STREAM", row).get();
System.out.println("-------------------------------");
String pullQuery = "select name, countrycode from USERS_STREAM emit changes;";
StreamedQueryResult streamedQueryResult = client.streamQuery(pullQuery, properties).get();
for (int i = 0; i < 10; i++) {
// Block until a new row is available
Row rr = streamedQueryResult.poll();
if (rr != null) {
System.out.println("Received a row!");
System.out.println("Row: " + rr.values());
} else {
System.out.println("Query has ended.");
}
}
}
}
You should be able to see the data.
ksql> select name, countrycode from users_stream emit changes;
+----------------------------------------------------------------+----------------------------------------------------------------+
|NAME |COUNTRYCODE |
+----------------------------------------------------------------+----------------------------------------------------------------+
|Arik |SL |