I tried to create a detailed notes looking at the : https://www.youtube.com/watch?v=RPdyTEwn27w.
Please use below setting to see all data from beginning.
SET 'auto.offset.reset'='earliest';
Create ORDERS Topic and ORDERS Streams
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic ORDERS
ksql> CREATE STREAM ORDERS (order_id VARCHAR, product_name VARCHAR, number INT, total_price DOUBLE)
WITH (VALUE_FORMAT='JSON',PARTITIONS=1,KAFKA_TOPIC='ORDERS');
ksql> list streams;
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
------------------------------------------------------------------------------------------
KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false
ORDERS | ORDERS | KAFKA | JSON | false
------------------------------------------------------------------------------------------
ksql>
ksql> list topics;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
ORDERS | 1 | 1
default_ksql_processing_log | 1 | 1
---------------------------------------------------------------
ksql>
Insert few records directly into stream using kSQL
ksql> INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('1','book',3,57.66);
ksql> INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('2','book',1,13.50);
ksql> INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('3','laptop',3,1000.99);
ksql>
Check the data on console consumer
kafka-console-consumer --topic ORDERS --bootstrap-server localhost:9092 --from-beginning
{"ORDER_ID":"1","PRODUCT_NAME":"book","NUMBER":3,"TOTAL_PRICE":57.66}
{"ORDER_ID":"2","PRODUCT_NAME":"book","NUMBER":1,"TOTAL_PRICE":13.5}
{"ORDER_ID":"3","PRODUCT_NAME":"laptop","NUMBER":3,"TOTAL_PRICE":1000.99}
Check the SQL query
ksql> SELECT * FROM ORDERS EMIT CHANGES;
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|ORDER_ID |PRODUCT_NAME |NUMBER |TOTAL_PRICE |
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|1 |book |3 |57.66 |
|2 |book |1 |13.5 |
|3 |laptop |3 |1000.99 |
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
Aggregate and Store Into Table
Create “PRODUCT_TOTAL_PRICE” Streams
CREATE TABLE PRODUCT_TOTAL_PRICE AS SELECT product_name, SUM(total_price) AS sum_total_price
FROM ORDERS GROUP BY product_name;
ksql> list tables;
Table Name | Kafka Topic | Key Format | Value Format | Windowed
----------------------------------------------------------------------------------
PRODUCT_TOTAL_PRICE | PRODUCT_TOTAL_PRICE | KAFKA | JSON | false
----------------------------------------------------------------------------------
Insert Some Data
ksql> INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('1','book',3,57.66);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('2','book',2,13.50);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('3','laptop',1,1000.99);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('4','laptop',2,2000.00);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('5','book',6,110.99);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('6','headset',1,99.99);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('7','laptop',1,500.99);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('8','book',2,20.17);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('9','headset',5,400.14);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('10','book',1,5.99);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('11','book',5,55.00);
>INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('12','book',2,15.00);
ksql>
Kafka Console Consumer for PRODUCT_TOTAL_PRICE
kafka-console-consumer --topic PRODUCT_TOTAL_PRICE --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator="-"
laptop-{"SUM_TOTAL_PRICE":3501.9799999999996}
headset-{"SUM_TOTAL_PRICE":500.13}
book-{"SUM_TOTAL_PRICE":278.31}
KSQL Query:
ksql> select * from PRODUCT_TOTAL_PRICE emit changes;
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|PRODUCT_NAME |SUM_TOTAL_PRICE |
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|laptop |3501.9799999999996 |
|headset |500.13 |
|book |278.31 |
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — -
Inner Join of two streams
Create Streams PAYMENTS and ORDERS_AND_PAYMENTS
ksql> CREATE STREAM PAYMENTS (order_id VARCHAR, payment_type VARCHAR)
WITH (KAFKA_TOPIC='PAYMENTS', VALUE_FORMAT='json', partitions=1);
CREATE STREAM ORDERS_AND_PAYMENTS AS SELECT * FROM ORDERS INNER JOIN PAYMENTS
>WITHIN 7 DAYS
>ON ORDERS.order_id = PAYMENTS.order_id;
ksql> show topics;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
ORDERS | 1 | 1
ORDERS_AND_PAYMENTS | 1 | 1
PAYMENTS | 1 | 1
PRODUCT_TOTAL_PRICE | 1 | 1
default_ksql_processing_log | 1 | 1
---------------------------------------------------------------
Insert Two different Data sets ORDERS and PAYMENTS.
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('1','book',3,57.66);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('2','book',2,13.50);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('3','laptop',1,1000.99);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('4','laptop',2,2000.00);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('5','book',6,110.99);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('6','headset',1,99.99);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('7','laptop',1,500.99);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('8','book',2,20.17);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('9','headset',5,400.14);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('10','book',1,5.99);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('11','book',5,55.00);
INSERT INTO ORDERS (order_id,product_name,number,total_price) VALUES ('12','book',2,15.00);
INSERT INTO PAYMENTS (order_id, payment_type) VALUES ('1', 'Paypal');
INSERT INTO PAYMENTS (order_id, payment_type) VALUES ('3', 'Mastercard');
INSERT INTO PAYMENTS (order_id, payment_type) VALUES ('2', 'Mastercard');
The console output on ORDERS_AND_PAYMENTS
kafka-console-consumer --topic ORDERS_AND_PAYMENTS --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator="-"
1-{"ORDERS_PRODUCT_NAME":"book","ORDERS_NUMBER":3,"ORDERS_TOTAL_PRICE":57.66,"PAYMENTS_ORDER_ID":"1","PAYMENTS_PAYMENT_TYPE":"Paypal"}
3-{"ORDERS_PRODUCT_NAME":"laptop","ORDERS_NUMBER":1,"ORDERS_TOTAL_PRICE":1000.99,"PAYMENTS_ORDER_ID":"3","PAYMENTS_PAYMENT_TYPE":"Mastercard"}
2-{"ORDERS_PRODUCT_NAME":"book","ORDERS_NUMBER":2,"ORDERS_TOTAL_PRICE":13.5,"PAYMENTS_ORDER_ID":"2","PAYMENTS_PAYMENT_TYPE":"Mastercard"}
Perform select query on KSQLDB
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> select * from ORDERS_AND_PAYMENTS emit changes;
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|ORDERS_ORDER_ID |ORDERS_PRODUCT_NAME |ORDERS_NUMBER |ORDERS_TOTAL_PRICE |PAYMENTS_ORDER_ID |PAYMENTS_PAYMENT_TYPE |
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|1 |book |3 |57.66 |1 |Paypal |
|3 |laptop |1 |1000.99 |3 |Mastercard |
|2 |book |2 |13.5 |2 |Mastercard |