You can use ksqlDB in Confluent Control Center to write streaming queries against messages in Kafka.
Launch the ksqlDB CLI
Open a new terminal window and run the following command to set the LOG_DIR
environment variable and launch the ksqlDB CLI.
LOG_DIR=./ksql_logs $CONFLUENT_HOME/bin/ksql
This command routes the CLI logs to the ./ksql_logs
directory, relative to
your current directory. By default, the CLI looks for a ksqlDB Server running
at http://localhost:8088
.
Create a Stream and Table
To write streaming queries against the pageviews
and users
topics,
register the the topics with ksqlDB as a stream and a table. You can use the
CREATE STREAM and CREATE TABLE statements in the ksqlDB Editor, or you can use
the Control Center UI .
These examples query records from the pageviews
and users
topics using
the following schema.
Create a stream in the ksqlDB editor
You can create a stream or table by using the CREATE STREAM and CREATE TABLE
statements in ksqlDB Editor, just like you use them in the ksqlDB CLI.
Copy the following code into the editing window and click Run.
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH
(kafka_topic='pageviews', value_format='DELIMITED');
Your output should resemble:
Click Streams to inspect the pageviews_original
stream that you created.
Create a table in the ksqlDB editor
Use the CREATE TABLE statement to register a table on a topic.
Copy the following code into the editor window and click Run query.
CREATE TABLE users (userid VARCHAR PRIMARY KEY, registertime BIGINT, id VARCHAR, regionid VARCHAR, gender VARCHAR) WITH
(KAFKA_TOPIC='users', VALUE_FORMAT='DELIMITED');
Your output should resemble:
In the editor window, use a SELECT query to inspect records in the
users
table.
SELECT * FROM users EMIT CHANGES;
Your output should resemble:
The query continues until you end it explicitly. Click Stop to end the
query.
Write Persistent Queries
With the pageviews
topic registered as a stream, and the users
topic
registered as a table, you can write streaming queries that run until you
end them with the TERMINATE statement.
Copy the following code into the editing window and click Run.
CREATE STREAM pageviews_enriched AS
SELECT users.userid AS userid, pageid, regionid, gender
FROM pageviews_original
LEFT JOIN users
ON pageviews_original.userid = users.userid
EMIT CHANGES;
Your output should resemble:
To inspect your persistent queries, navigate to the Running Queries page,
which shows details about the pageviews_enriched
stream that you created
in the previous query.
Click Explain to see the schema and query properties for the persistent
query.
Monitor Persistent Queries
You can monitor your persistent queries visually by using Confluent Control Center.
In the navigation menu, click Consumers and find the consumer group
for the pageviews_enriched
query, which has a name that starts with
_confluent-ksql-default_query_CSAS_PAGEVIEWS_ENRICHED_
. The
Consumer lag page opens.
Click Consumption to see the rate that the pageviews_enriched
query is
consuming records. Click Last four hours and from the list, select
Last 30 minutes and Apply.
Your output should resemble:
Query Properties
You can assign properties in the ksqlDB Editor before you run your queries.
In the navigation menu, click ksqlDB to open the ksqlDB clusters page,
and click the default application to open the ksqlDB Editor.
Click Add query properties and set the auto.offset.reset
field to
Earliest.
Copy the following code into the editing window and click Run.
CREATE STREAM pageviews_female AS
SELECT * FROM pageviews_enriched
WHERE gender = 'FEMALE'
EMIT CHANGES;
The pageviews_female
stream starts with the earliest record in the
pageviews
topic, which means that it consumes all of the available
records from the beginning.
Confirm that the auto.offset.reset
property was applied to the
pageviews_female
stream. In the navigation menu, click Consumers
and find the consumer group for the pageviews_female
stream, which has
a name that starts with _confluent-ksql-default_query_CSAS_PAGEVIEWS_FEMALE_
.
Click Consumption to see the rate that the pageviews_female
query is
consuming records. Ensure that the time scale is set to Last 30 minutes.
The graph is at 100 percent, because all of the records were consumed when
the pageviews_female
stream started.
View streams and tables
You can see all of your persistent queries, streams, and tables in a single,
unified view.
In the navigation menu, click ksqlDB to open the ksqlDB clusters page,
and click the default application to open the ksqlDB Editor.
On the right side of the page, find the All available streams and tables
section.
Click KSQL_PROCESSING_LOG to open the stream. The schema for the stream
is displayed, including nested data structures.
Download selected records
You can download records that you select in the query results window as a
JSON file.
Copy the following code into the editing window and click Run.
SELECT * FROM PAGEVIEWS_FEMALE EMIT CHANGES;
In the query results window, click the pause button and select some records
and click Download.
Use Flow View to inspect your topology
Control Center enables you to see how events flow through your ksqlDB application.
In the ksqlDB page, click Flow.
Click the PAGEVIEWS_ENRICHED node in the graph to see details about the
PAGEVIEWS_ENRICHED
stream, including current messages and schema.
Click the other nodes in the graph to see details about the topology of your
ksqlDB application.