ksqlDB Tutorial — Quickstart

Prateek
4 min readApr 10, 2023

--

Photo by Sandro Gonzalez on Unsplash

I tried to create a detailed notes looking at the : https://www.youtube.com/watch?v=RPdyTEwn27w.

Please use below setting to see all data from beginning.

SET 'auto.offset.reset'='earliest';

Create ORDERS Topic and ORDERS Streams

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1  --topic ORDERS

ksql> CREATE STREAM ORDERS (order_id VARCHAR, product_name VARCHAR, number INT, total_price DOUBLE)
WITH (VALUE_FORMAT='JSON',PARTITIONS=1,KAFKA_TOPIC='ORDERS');


ksql> list streams;

Stream Name | Kafka Topic | Key Format | Value Format | Windowed
------------------------------------------------------------------------------------------
KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false
ORDERS | ORDERS | KAFKA | JSON | false
------------------------------------------------------------------------------------------
ksql>


ksql> list topics;

Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
ORDERS | 1 | 1
default_ksql_processing_log | 1 | 1
---------------------------------------------------------------
ksql>

Insert few records directly into stream using kSQL

ksql> INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('1','book',3,57.66);
ksql> INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('2','book',1,13.50);
ksql> INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('3','laptop',3,1000.99);
ksql>

Check the data on console consumer

kafka-console-consumer --topic ORDERS --bootstrap-server localhost:9092 --from-beginning
{"ORDER_ID":"1","PRODUCT_NAME":"book","NUMBER":3,"TOTAL_PRICE":57.66}
{"ORDER_ID":"2","PRODUCT_NAME":"book","NUMBER":1,"TOTAL_PRICE":13.5}
{"ORDER_ID":"3","PRODUCT_NAME":"laptop","NUMBER":3,"TOTAL_PRICE":1000.99}

Check the SQL query

ksql> SELECT * FROM ORDERS EMIT CHANGES;
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|ORDER_ID |PRODUCT_NAME |NUMBER |TOTAL_PRICE |
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|1 |book |3 |57.66 |
|2 |book |1 |13.5 |
|3 |laptop |3 |1000.99 |

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

Aggregate and Store Into Table

Create “PRODUCT_TOTAL_PRICE” Streams

CREATE TABLE PRODUCT_TOTAL_PRICE AS SELECT product_name, SUM(total_price) AS sum_total_price 
FROM ORDERS GROUP BY product_name;

ksql> list tables;

Table Name | Kafka Topic | Key Format | Value Format | Windowed
----------------------------------------------------------------------------------
PRODUCT_TOTAL_PRICE | PRODUCT_TOTAL_PRICE | KAFKA | JSON | false
----------------------------------------------------------------------------------

Insert Some Data

ksql> INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('1','book',3,57.66);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('2','book',2,13.50);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('3','laptop',1,1000.99);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('4','laptop',2,2000.00);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('5','book',6,110.99);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('6','headset',1,99.99);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('7','laptop',1,500.99);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('8','book',2,20.17);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('9','headset',5,400.14);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('10','book',1,5.99);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('11','book',5,55.00);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('12','book',2,15.00);
ksql>

Kafka Console Consumer for PRODUCT_TOTAL_PRICE

kafka-console-consumer --topic PRODUCT_TOTAL_PRICE --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator="-"
laptop-{"SUM_TOTAL_PRICE":3501.9799999999996}
headset-{"SUM_TOTAL_PRICE":500.13}
book-{"SUM_TOTAL_PRICE":278.31}

KSQL Query:

ksql> select * from PRODUCT_TOTAL_PRICE emit changes;
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|PRODUCT_NAME |SUM_TOTAL_PRICE |
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|laptop |3501.9799999999996 |
|headset |500.13 |
|book |278.31 |

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — -

Inner Join of two streams

Create Streams PAYMENTS and ORDERS_AND_PAYMENTS

ksql> CREATE STREAM PAYMENTS (order_id VARCHAR, payment_type VARCHAR) 
WITH (KAFKA_TOPIC='PAYMENTS', VALUE_FORMAT='json', partitions=1);
CREATE STREAM ORDERS_AND_PAYMENTS AS SELECT * FROM ORDERS INNER JOIN PAYMENTS 
>WITHIN 7 DAYS
>ON ORDERS.order_id = PAYMENTS.order_id;
ksql> show topics;

Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
ORDERS | 1 | 1
ORDERS_AND_PAYMENTS | 1 | 1
PAYMENTS | 1 | 1
PRODUCT_TOTAL_PRICE | 1 | 1
default_ksql_processing_log | 1 | 1
---------------------------------------------------------------

Insert Two different Data sets ORDERS and PAYMENTS.

INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('1','book',3,57.66);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('2','book',2,13.50);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('3','laptop',1,1000.99);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('4','laptop',2,2000.00);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('5','book',6,110.99);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('6','headset',1,99.99);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('7','laptop',1,500.99);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('8','book',2,20.17);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('9','headset',5,400.14);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('10','book',1,5.99);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('11','book',5,55.00);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('12','book',2,15.00);

INSERT INTO PAYMENTS (order_id, payment_type) VALUES ('1', 'Paypal');
INSERT INTO PAYMENTS (order_id, payment_type) VALUES ('3', 'Mastercard');
INSERT INTO PAYMENTS (order_id, payment_type) VALUES ('2', 'Mastercard');

The console output on ORDERS_AND_PAYMENTS

kafka-console-consumer --topic ORDERS_AND_PAYMENTS --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator="-"
1-{"ORDERS_PRODUCT_NAME":"book","ORDERS_NUMBER":3,"ORDERS_TOTAL_PRICE":57.66,"PAYMENTS_ORDER_ID":"1","PAYMENTS_PAYMENT_TYPE":"Paypal"}
3-{"ORDERS_PRODUCT_NAME":"laptop","ORDERS_NUMBER":1,"ORDERS_TOTAL_PRICE":1000.99,"PAYMENTS_ORDER_ID":"3","PAYMENTS_PAYMENT_TYPE":"Mastercard"}
2-{"ORDERS_PRODUCT_NAME":"book","ORDERS_NUMBER":2,"ORDERS_TOTAL_PRICE":13.5,"PAYMENTS_ORDER_ID":"2","PAYMENTS_PAYMENT_TYPE":"Mastercard"}

Perform select query on KSQLDB

ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

ksql> select * from ORDERS_AND_PAYMENTS emit changes;
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|ORDERS_ORDER_ID |ORDERS_PRODUCT_NAME |ORDERS_NUMBER |ORDERS_TOTAL_PRICE |PAYMENTS_ORDER_ID |PAYMENTS_PAYMENT_TYPE |
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|1 |book |3 |57.66 |1 |Paypal |
|3 |laptop |1 |1000.99 |3 |Mastercard |
|2 |book |2 |13.5 |2 |Mastercard |

--

--

Prateek
Prateek

Written by Prateek

Java Developer and enthusiast

No responses yet