This tutorial demonstrates a simple workflow using ksqlDB to write streaming
queries against messages in Kafka.
To get started, you must start a Kafka cluster, including ZooKeeper and a Kafka broker.
ksqlDB will then query messages from this Kafka cluster. ksqlDB is installed
in Confluent Platform by default.
Launch the ksqlDB CLI
Open a new terminal window and run the following command to set the LOG_DIR
environment variable and launch the ksqlDB CLI.
LOG_DIR=./ksql_logs $CONFLUENT_HOME/bin/ksql
This command routes the CLI logs to the ./ksql_logs
directory, relative to
your current directory. By default, the CLI looks for a ksqlDB Server running
at http://localhost:8088
.
Important
By default ksqlDB attempts to store its logs in a directory called logs
that is relative to the location
of the ksql
executable. For example, if ksql
is installed at /usr/local/bin/ksql
, then it would
attempt to store its logs in /usr/local/logs
. If you are running ksql
from the default Confluent Platform
location, $CONFLUENT_HOME/bin
, you must override this default behavior by using the LOG_DIR
variable.
After ksqlDB is started, your terminal should resemble this.
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v6.1.0, Server v6.1.0 located at http://localhost:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
Inspect Kafka Topics By Using SHOW and PRINT Statements
ksqlDB enables inspecting Kafka topics and messages in real time.
- Use the SHOW TOPICS statement to list the available topics in the Kafka cluster.
- Use the PRINT statement to see a topic’s messages as they arrive.
In the ksqlDB CLI, run the following statement:
Your output should resemble:
Kafka Topic | Partitions | Partition Replicas
--------------------------------------------------------------
default_ksql_processing_log | 1 | 1
pageviews | 1 | 1
users | 1 | 1
--------------------------------------------------------------
By default, ksqlDB hides internal and system topics. Use the SHOW ALL TOPICS
statement to see the full list of topics in the Kafka cluster:
Your output should resemble:
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------------------------------------------------------------------------
_confluent-command | 1 | 1
_confluent-controlcenter-... | 2 | 1
...
_confluent-ksql-default__command_topic | 1 | 1
_confluent-license | 1 | 1
_confluent-metrics | 12 | 1
_confluent-monitoring | 2 | 1
_confluent-telemetry-metrics | 12 | 1
_confluent_balancer_api_state | 1 | 1
_confluent_balancer_broker_samples | 32 | 1
_confluent_balancer_partition_samples | 32 | 1
_schemas | 1 | 1
connect-configs | 1 | 1
connect-offsets | 25 | 1
connect-statuses | 5 | 1
default_ksql_processing_log | 1 | 1
pageviews | 1 | 1
users | 1 | 1
---------------------------------------------------------------------------------------------------------------------------------
Note
Your output should show numerous _confluent-controlcenter
topics. These
have been removed for clarity.
Inspect the users
topic by using the PRINT statement:
Note
The PRINT statement is one of the few case-sensitive commands in ksqlDB,
even when the topic name is not quoted.
Your output should resemble:
Key format: KAFKA_STRING
Value format: KAFKA_STRING
rowtime: 2021/01/27 18:06:22.057 Z, key: User_6, value: 1505521983750,User_6,Region_4,OTHER
rowtime: 2021/01/27 18:06:23.057 Z, key: User_8, value: 1518180723778,User_8,Region_1,OTHER
rowtime: 2021/01/27 18:06:24.057 Z, key: User_8, value: 1517994971847,User_8,Region_5,MALE
^CTopic printing ceased
Press CTRL+C to stop printing messages.
Inspect the pageviews
topic by using the PRINT statement:
Your output should resemble:
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Value format: KAFKA_STRING
rowtime: 2021/01/27 18:08:50.961 Z, key: 1611770930961, value: 1611770930961,User_4,Page_32
rowtime: 2021/01/27 18:08:51.161 Z, key: 1611770931161, value: 1611770931161,User_5,Page_81
rowtime: 2021/01/27 18:08:51.361 Z, key: 1611770931361, value: 1611770931361,User_1,Page_74
^CTopic printing ceased
Press CTRL+C to stop printing messages.
For more information, see ksqlDB Syntax Reference.
Write Queries
These examples write queries using ksqlDB.
Note: By default ksqlDB reads the topics for streams and tables from
the latest offset.
Create a query that enriches the pageviews
data with the user’s gender
and regionid
from the users
table. The following query enriches the
pageviews_original
stream by doing a LEFT JOIN with the
users_original
table on the userid
column.
SELECT users_original.userid AS userid, pageid, regionid, gender
FROM pageviews_original
LEFT JOIN users_original
ON pageviews_original.userid = users_original.id
EMIT CHANGES
LIMIT 5;
Your output should resemble:
+-------------------+-------------------+-------------------+-------------------+
|ID |PAGEID |REGIONID |GENDER |
+-------------------+-------------------+-------------------+-------------------+
|User_7 |Page_23 |Region_2 |OTHER |
|User_3 |Page_42 |Region_2 |MALE |
|User_7 |Page_87 |Region_2 |OTHER |
|User_2 |Page_57 |Region_5 |FEMALE |
|User_9 |Page_59 |Region_1 |OTHER |
Limit Reached
Query terminated
Create a persistent query by using the CREATE STREAM
keywords to precede
the SELECT
statement. The results from this query are written to the
PAGEVIEWS_ENRICHED
Kafka topic. The following query enriches the
pageviews_original
STREAM by doing a LEFT JOIN
with the users_original
TABLE on the user ID.
CREATE STREAM pageviews_enriched AS
SELECT users_original.userid AS userid, pageid, regionid, gender
FROM pageviews_original
LEFT JOIN users_original
ON pageviews_original.userid = users_original.id
EMIT CHANGES;
Your output should resemble:
Message
--------------------------------------------------
Created query with ID CSAS_PAGEVIEWS_ENRICHED_33
--------------------------------------------------
Tip
You can run DESCRIBE pageviews_enriched;
to describe the stream.
Use SELECT
to view query results as they come in. To stop viewing the query results, press Ctrl+C. This stops printing to the
console but it does not terminate the actual query. The query continues to run in the underlying ksqlDB application.
SELECT * FROM pageviews_enriched emit changes;
Your output should resemble:
+---------------------+---------------------+---------------------+---------------------+
|ID |PAGEID |REGIONID |GENDER |
+---------------------+---------------------+---------------------+---------------------+
|User_8 |Page_41 |Region_4 |FEMALE |
|User_2 |Page_87 |Region_3 |OTHER |
|User_3 |Page_84 |Region_8 |FEMALE |
^CQuery terminated
Use Ctrl+C to terminate the query.
Create a new persistent query where a condition limits the streams content,
using WHERE
. Results from this query are written to a Kafka topic named
PAGEVIEWS_FEMALE
.
CREATE STREAM pageviews_female AS
SELECT * FROM pageviews_enriched
WHERE gender = 'FEMALE'
EMIT CHANGES;
Your output should resemble:
Message
------------------------------------------------
Created query with ID CSAS_PAGEVIEWS_FEMALE_35
------------------------------------------------
Tip
You can run DESCRIBE pageviews_female;
to describe the stream.
Create a new persistent query where another condition is met, using LIKE.
Results from this query are written to the pageviews_enriched_r8_r9
Kafka
topic.
CREATE STREAM pageviews_female_like_89
WITH (kafka_topic='pageviews_enriched_r8_r9') AS
SELECT * FROM pageviews_female
WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
EMIT CHANGES;
Your output should resemble:
Message
--------------------------------------------------------
Created query with ID CSAS_PAGEVIEWS_FEMALE_LIKE_89_37
--------------------------------------------------------
Create a new persistent query that counts the pageviews for each region and
gender combination in a tumbling window of 30
seconds when the count is greater than one. Results from this query are
written to the PAGEVIEWS_REGIONS
Kafka topic in the Avro format. ksqlDB
registers the Avro schema with the configured Schema Registry when it writes the first
message to the PAGEVIEWS_REGIONS
topic.
CREATE TABLE pageviews_regions
WITH (VALUE_FORMAT='avro') AS
SELECT gender, regionid , COUNT(*) AS numusers
FROM pageviews_enriched
WINDOW TUMBLING (size 30 second)
GROUP BY gender, regionid
EMIT CHANGES;
Your output should resemble:
Message
-------------------------------------------------
Created query with ID CTAS_PAGEVIEWS_REGIONS_39
-------------------------------------------------
Tip
You can run DESCRIBE pageviews_regions;
to describe the table.
Optional: View results from the above queries by using a push query.
SELECT * FROM pageviews_regions EMIT CHANGES LIMIT 5;
Your output should resemble:
+----------------------+----------------------+----------------------+----------------------+
|KSQL_COL_0 |WINDOWSTART |WINDOWEND |NUMUSERS |
+----------------------+----------------------+----------------------+----------------------+
|FEMALE|+|Region_2 |1611774900000 |1611774930000 |1 |
|FEMALE|+|Region_3 |1611774900000 |1611774930000 |2 |
|FEMALE|+|Region_4 |1611774900000 |1611774930000 |3 |
|FEMALE|+|Region_8 |1611774900000 |1611774930000 |2 |
|MALE|+|Region_4 |1611774900000 |1611774930000 |3 |
Limit Reached
Query terminated
Note
Notice the addition of the WINDOWSTART and WINDOWEND columns.
These are available because pageviews_regions
is aggregating data
per 30 second window. ksqlDB automatically adds these system columns
for windowed results.
Optional: View results from the previous queries by using pull query.
When a CREATE TABLE statement contains a GROUP BY clause, ksqlDB internally
builds a table that contains the results of the aggregation. ksqlDB supports
pull queries against such aggregation results.
Unlike the push query used in the previous step, which pushes a stream of
results to you, pull queries pull a result set and automatically terminate.
Pull queries do not have the EMIT CHANGES clause.
View all of the windows and user counts available for a specific gender and
region by using a pull query:
SELECT * FROM pageviews_regions WHERE KSQL_COL_0='FEMALE|+|Region_4';
Your output should resemble:
+----------------------+----------------------+----------------------+----------------------+
|KSQL_COL_0 |WINDOWSTART |WINDOWEND |NUMUSERS |
+----------------------+----------------------+----------------------+----------------------+
|FEMALE|+|Region_4 |1611774900000 |1611774930000 |3 |
|FEMALE|+|Region_4 |1611774930000 |1611774960000 |2 |
|FEMALE|+|Region_4 |1611774990000 |1611775020000 |3 |
Query terminated
Pull queries on windowed tables like pageviews_regions
also support
querying a single window’s result:
SELECT NUMUSERS FROM pageviews_regions WHERE
KSQL_COL_0='FEMALE|+|Region_4' AND WINDOWSTART=1611774900000;
Note
You must change the value of WINDOWSTART
in the previous SQL to
match one of the window boundaries in your data. Otherwise, no results
are returned.
Your output should resemble:
+----------+
|NUMUSERS |
+----------+
|4 |
Query terminated
To query a range of windows:
SELECT WINDOWSTART, WINDOWEND, NUMUSERS FROM pageviews_regions WHERE
KSQL_COL_0='OTHER|+|Region_9' AND 1611774900000 <= WINDOWSTART AND WINDOWSTART <= 1611775020000;
Note
You must change the value of WINDOWSTART
in the previous SQL to
match one of the window boundaries in your data. Otherwise, no results
are returned.
Your output should resemble:
+------------------------------+------------------------------+------------------------------+
|WINDOWSTART |WINDOWEND |NUMUSERS |
+------------------------------+------------------------------+------------------------------+
|1611774930000 |1611774960000 |8 |
|1611774960000 |1611774990000 |1 |
|1611774990000 |1611775020000 |17 |
|1611775020000 |1611775050000 |22 |
Query terminated
Optional: Show all persistent queries.
Your output should resemble:
Query ID | Query Type | Status | Sink Name | Sink Kafka Topic | Query String
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
CSAS_PAGEVIEWS_ENRICHED_33 | PERSISTENT | RUNNING:1 | PAGEVIEWS_ENRICHED | PAGEVIEWS_ENRICHED | CREATE STREAM PAGEVIEWS_ENRICHED WITH (KAFKA_TOPIC='PAGEVIEWS_ENRICHED', PARTITIONS=1, REPLICAS=1) AS SELECT USERS_ORIGINAL.ID ID, PAGEVIEWS_ORIGINAL.PAGEID PAGEID, USERS_ORIGINAL.REGIONID REGIONID, USERS_ORIGINAL.GENDER GENDER FROM PAGEVIEWS_ORIGINAL PAGEVIEWS_ORIGINAL LEFT OUTER JOIN USERS_ORIGINAL USERS_ORIGINAL ON ((PAGEVIEWS_ORIGINAL.USERID = USERS_ORIGINAL.ID)) EMIT CHANGES;
CSAS_PAGEVIEWS_FEMALE_35 | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE | PAGEVIEWS_FEMALE | CREATE STREAM PAGEVIEWS_FEMALE WITH (KAFKA_TOPIC='PAGEVIEWS_FEMALE', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WHERE (PAGEVIEWS_ENRICHED.GENDER = 'FEMALE') EMIT CHANGES;
CTAS_PAGEVIEWS_REGIONS_39 | PERSISTENT | RUNNING:1 | PAGEVIEWS_REGIONS | PAGEVIEWS_REGIONS | CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro') AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES;
CSAS_PAGEVIEWS_FEMALE_LIKE_89_37 | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | CREATE STREAM PAGEVIEWS_FEMALE_LIKE_89 WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_FEMALE PAGEVIEWS_FEMALE WHERE ((PAGEVIEWS_FEMALE.REGIONID LIKE '%_8') OR (PAGEVIEWS_FEMALE.REGIONID LIKE '%_9')) EMIT CHANGES;
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
Optional: Examine query run-time metrics and details. Information including
the target Kafka topic is available, as well as throughput figures for the
messages being processed.
DESCRIBE EXTENDED PAGEVIEWS_REGIONS;
Your output should resemble:
Name : PAGEVIEWS_REGIONS
Type : TABLE
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : AVRO
Kafka topic : PAGEVIEWS_REGIONS (partitions: 1, replication: 1)
Statement : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro') AS SELECT
PAGEVIEWS_ENRICHED.GENDER GENDER,
PAGEVIEWS_ENRICHED.REGIONID REGIONID,
COUNT(*) NUMUSERS
FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED
WINDOW TUMBLING ( SIZE 30 SECONDS )
GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID
EMIT CHANGES;
Field | Type
---------------------------------------------------------------------
KSQL_COL_0 | VARCHAR(STRING) (primary key) (Window type: TUMBLING)
NUMUSERS | BIGINT
---------------------------------------------------------------------
Queries that write from this TABLE
-----------------------------------
CTAS_PAGEVIEWS_REGIONS_39 (RUNNING) : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro') AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 2.89 total-messages: 3648 last-message: 2021-01-27T19:36:11.197Z
(Statistics of the local KSQL server interaction with the Kafka topic PAGEVIEWS_REGIONS)
Consumer Groups summary:
Consumer Group : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_39
Kafka topic : PAGEVIEWS_ENRICHED
Max lag : 5
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 7690 | 7685 | 5
------------------------------------------------------
Kafka topic : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_39-Aggregate-GroupBy-repartition
Max lag : 5
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 6224 | 6229 | 6224 | 5
------------------------------------------------------
Using Nested Schemas (STRUCT) in ksqlDB
Struct support enables the modeling and access of nested data in Kafka
topics, from both JSON and Avro.
Here we’ll use the ksql-datagen
tool to create some sample data
which includes a nested address
field. Run this in a new window, and
leave it running.
$CONFLUENT_HOME/bin/ksql-datagen \
quickstart=orders \
format=avro \
topic=orders \
msgRate=1
From the ksqlDB command prompt, register the topic in ksqlDB:
CREATE STREAM orders
(
ordertime BIGINT,
orderid INT,
itemid STRING,
orderunits DOUBLE,
address STRUCT<city STRING, state STRING, zipcode BIGINT>
)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='avro');
Your output should resemble:
Message
----------------
Stream created
----------------
Use the DESCRIBE
function to observe the schema, which includes a
STRUCT
:
Your output should resemble:
Field | Type
----------------------------------------------------------------------------------
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Query the data by using ->
notation to access the Struct contents:
SELECT ORDERID, ADDRESS->CITY FROM ORDERS EMIT CHANGES LIMIT 5;
Your output should resemble:
+-----------------------------------+-----------------------------------+
|ORDERID |ADDRESS__CITY |
+-----------------------------------+-----------------------------------+
|1188 |City_95 |
|1189 |City_24 |
|1190 |City_57 |
|1191 |City_37 |
|1192 |City_82 |
Limit Reached
Query terminated
Stream-Stream join
Using a stream-stream join, it is possible to join two streams of
events on a common key. An example of this could be a stream of order
events, and a stream of shipment events. By joining these on the order
key, it is possible to see shipment information alongside the order.
In the ksqlDB CLI create two new streams. Use the kafka-console-producer
command to create the Kafka topics, new_orders
and shipments
:
Use the CREATE STREAM statement to register streams on the new_orders
and shipments
topics:
CREATE STREAM new_orders (order_id INT, total_amount DOUBLE, customer_name VARCHAR)
WITH (KAFKA_TOPIC='new_orders', VALUE_FORMAT='JSON', PARTITIONS=2);
CREATE STREAM shipments (order_id INT, shipment_id INT, warehouse VARCHAR)
WITH (KAFKA_TOPIC='shipments', VALUE_FORMAT='JSON', PARTITIONS=2);
Note
ksqlDB creates the underlying topics in Kafka when these statements
are executed. Also, you can specify the REPLICAS
count.
After both CREATE STREAM
statements, your output should resemble:
Message
----------------
Stream created
----------------
Populate the streams with some sample data using the INSERT VALUES statement:
-- Insert values in NEW_ORDERS:
-- insert supplying the list of columns to insert:
INSERT INTO new_orders (order_id, customer_name, total_amount)
VALUES (1, 'Bob Smith', 10.50);
-- shorthand syntax can be used when inserting values for all columns (except ROWTIME), in column order:
INSERT INTO new_orders VALUES (2, 3.32, 'Sarah Black');
INSERT INTO new_orders VALUES (3, 21.00, 'Emma Turner');
-- Insert values in SHIPMENTS:
INSERT INTO shipments VALUES (1, 42, 'Nashville');
INSERT INTO shipments VALUES (3, 43, 'Palo Alto');
Query the data to confirm that it’s present in the topics.
Tip
Run the following to tell ksqlDB to read from the beginning
of each stream:
SET 'auto.offset.reset' = 'earliest';
You can skip this if you have already run it within your current
ksqlDB CLI session.`
For the new_orders
topic, run:
SELECT * FROM new_orders EMIT CHANGES LIMIT 3;
Your output should resemble:
+-------------------------+-------------------------+-------------------------+
|ORDER_ID |TOTAL_AMOUNT |CUSTOMER_NAME |
+-------------------------+-------------------------+-------------------------+
|1 |10.5 |Bob Smith |
|2 |3.32 |Sarah Black |
|3 |21.0 |Emma Turner |
Limit Reached
Query terminated
For the shipments
topic, run:
SELECT * FROM shipments EMIT CHANGES LIMIT 2;
Your output should resemble:
+-------------------------+-------------------------+-------------------------+
|ORDER_ID |SHIPMENT_ID |WAREHOUSE |
+-------------------------+-------------------------+-------------------------+
|1 |42 |Nashville |
|3 |43 |Palo Alto |
Limit Reached
Query terminated
Run the following query, which will show orders with associated shipments,
based on a join window of 1 hour.
SELECT o.order_id, o.total_amount, o.customer_name, s.shipment_id, s.warehouse
FROM new_orders o
INNER JOIN shipments s
WITHIN 2 HOURS
ON o.order_id = s.order_id
EMIT CHANGES;
Your output should resemble:
+--------------+--------------+--------------+--------------+--------------+
|O_ORDER_ID |TOTAL_AMOUNT |CUSTOMER_NAME |SHIPMENT_ID |WAREHOUSE |
+--------------+--------------+--------------+--------------+--------------+
|1 |10.5 |Bob Smith |42 |Nashville |
|3 |21.0 |Emma Turner |43 |Palo Alto |
Note that message with ORDER_ID=2
has no corresponding
SHIPMENT_ID
or WAREHOUSE
. This is because there is no
corresponding row on the shipments
stream within the specified
time window.
In another terminal, start the ksqlDB CLI:
LOG_DIR=./ksql_logs $CONFLUENT_HOME/bin/ksql
Enter the following INSERT VALUES statement to insert the shipment for
order id 2:
Switch to your first ksqlDB CLI window. A third row has now been output:
+--------------+--------------+--------------+--------------+--------------+
|O_ORDER_ID |TOTAL_AMOUNT |CUSTOMER_NAME |SHIPMENT_ID |WAREHOUSE |
+--------------+--------------+--------------+--------------+--------------+
|1 |10.5 |Bob Smith |42 |Nashville |
|2 |3.32 |Sarah Black |49 |London |
|3 |21.0 |Emma Turner |43 |Palo Alto |
^CQuery terminated
Press Ctrl+C to cancel the SELECT query and return to the ksqlDB prompt.
Table-Table join
Using a table-table join, it is possible to join two tables of on a
common key. ksqlDB tables provide the latest value for a given key.
They can only be joined on the key, and one-to-many (1:N) joins are
not supported in the current semantic model.
In this example we have location data about a warehouse from one system,
being enriched with data about the size of the warehouse from another.
In the ksqlDB CLI, register both topics as ksqlDB tables. Note, in this example
the warehouse id is stored both in the key and in the WAREHOUSE_ID field
in the value:
CREATE TABLE warehouse_location (warehouse_id INT PRIMARY KEY, city VARCHAR, country VARCHAR)
WITH (KAFKA_TOPIC='warehouse_location', VALUE_FORMAT='JSON', PARTITIONS=2);
CREATE TABLE warehouse_size (warehouse_id INT PRIMARY KEY, square_footage DOUBLE)
WITH (KAFKA_TOPIC='warehouse_size', VALUE_FORMAT='JSON', PARTITIONS=2);
After both CREATE TABLE
statements, your output should resemble:
Message
---------------
Table created
---------------
INSERT INTO warehouse_location (warehouse_id, city, country) VALUES (1, 'Leeds', 'UK');
INSERT INTO warehouse_location (warehouse_id, city, country) VALUES (2, 'Sheffield', 'UK');
INSERT INTO warehouse_location (warehouse_id, city, country) VALUES (3, 'Berlin', 'Germany');
INSERT INTO warehouse_size (warehouse_id, square_footage) VALUES (1, 16000);
INSERT INTO warehouse_size (warehouse_id, square_footage) VALUES (2, 42000);
INSERT INTO warehouse_size (warehouse_id, square_footage) VALUES (3, 94000);
Inspect the warehouse_location
table:
SELECT * FROM warehouse_location EMIT CHANGES LIMIT 3;
Your output should resemble:
+-------------------------+-------------------------+-------------------------+
|WAREHOUSE_ID |CITY |COUNTRY |
+-------------------------+-------------------------+-------------------------+
|1 |Leeds |UK |
|2 |Sheffield |UK |
|3 |Berlin |Germany |
Limit Reached
Query terminated
Inspect the warehouse_size
table:
SELECT * FROM warehouse_size EMIT CHANGES LIMIT 3;
Your output should resemble:
+---------------------------------------+---------------------------------------+
|WAREHOUSE_ID |SQUARE_FOOTAGE |
+---------------------------------------+---------------------------------------+
|1 |16000.0 |
|2 |42000.0 |
|3 |94000.0 |
Limit Reached
Query terminated
Now join the two tables:
SELECT wl.warehouse_id, wl.city, wl.country, ws.square_footage
FROM warehouse_location wl
LEFT JOIN warehouse_size ws
ON wl.warehouse_id=ws.warehouse_id
EMIT CHANGES
LIMIT 3;
Your output should resemble:
+------------------+------------------+------------------+------------------+
|WL_WAREHOUSE_ID |CITY |COUNTRY |SQUARE_FOOTAGE |
+------------------+------------------+------------------+------------------+
|1 |Leeds |UK |16000.0 |
|2 |Sheffield |UK |42000.0 |
|3 |Berlin |Germany |94000.0 |
Limit Reached
Query terminated
INSERT INTO
You can use the INSERT INTO syntax to merge the contents of multiple
streams. An example of this could be where the same event type is coming
from different sources.
Run two datagen processes, each writing to a different topic, simulating
order data arriving from a local installation vs from a third-party:
Tip
Each of these commands should be run in a separate window. When the exercise is finished, exit them by pressing Ctrl-C.
$CONFLUENT_HOME/bin/ksql-datagen \
quickstart=orders \
format=json \
topic=orders_local \
msgRate=2
$CONFLUENT_HOME/bin/ksql-datagen \
quickstart=orders \
format=json \
topic=orders_3rdparty \
msgRate=2
In the ksqlDB CLI, register the source topic for each:
CREATE STREAM orders_src_local
(
ordertime BIGINT,
orderid INT,
itemid STRING,
orderunits DOUBLE,
address STRUCT<city STRING, state STRING, zipcode BIGINT>
)
WITH (KAFKA_TOPIC='orders_local', VALUE_FORMAT='JSON');
CREATE STREAM orders_src_3rdparty
(
ordertime BIGINT,
orderid INT,
itemid STRING,
orderunits DOUBLE,
address STRUCT<city STRING, state STRING, zipcode BIGINT>
)
WITH (KAFKA_TOPIC='orders_3rdparty', VALUE_FORMAT='JSON');
After each CREATE STREAM statement you should get the message:
Message
----------------
Stream created
----------------
Create the output stream, using the standard CREATE STREAM … AS
syntax. Because multiple sources of data are being joined into a common target,
it is useful to add in lineage information. This can be done by simply including it
as part of the SELECT
:
CREATE STREAM all_orders AS SELECT 'LOCAL' AS SRC, * FROM orders_src_local EMIT CHANGES;
Your output should resemble:
Message
------------------------------------------
Created query with ID CSAS_ALL_ORDERS_71
------------------------------------------
Use the DESCRIBE
command to observe the schema of the target stream.
Your output should resemble:
Name : ALL_ORDERS
Field | Type
----------------------------------------------------------------------------------
SRC | VARCHAR(STRING)
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Add stream of third-party orders into the existing output stream:
INSERT INTO all_orders SELECT '3RD PARTY' AS SRC, * FROM orders_src_3rdparty EMIT CHANGES;
Your output should resemble:
Message
--------------------------------------
Created query with ID INSERTQUERY_73
--------------------------------------
Query the output stream to verify that data from each source is being
written to it:
SELECT * FROM all_orders EMIT CHANGES;
Your output should resemble the following. Note that there are messages from both source
topics (denoted by LOCAL
and 3RD PARTY
respectively).
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|SRC |ORDERTIME |ORDERID |ITEMID |ORDERUNITS |ADDRESS |
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|3RD PARTY |1491006356222 |1583 |Item_169 |2.091966572094054 |{CITY=City_91, STATE=|
| | | | | |State_51, ZIPCODE=184|
| | | | | |74} |
|LOCAL |1504382324241 |1630 |Item_410 |0.6462658185260942 |{CITY=City_55, STATE=|
| | | | | |State_38, ZIPCODE=372|
| | | | | |44} |
|3RD PARTY |1512567250385 |1584 |Item_357 |7.205193136057381 |{CITY=City_91, STATE=|
| | | | | |State_19, ZIPCODE=457|
| | | | | |45} |
^CQuery terminated
Press Ctrl+C to cancel the SELECT
query and return to the ksqlDB prompt.