Breaking Changes
The “select star” statement (select *
), no longer expands to include ROWTIME
column(s). Instead, ROWTIME
is included in the results of queries only if
explicitly included in the projection, for example. select rowtime, *
. This
change affects only new statements. Any view previously created via a CREATE
STREAM AS SELECT or CREATE TABLE AS SELECT statement is unaffected.
This release changes the system-generated column name for any columns in
projections dereference fields in a struct. Previously, the full path
was used when generating the name. In 6.0, only the final field name is used.
For example, SELECT someStruct->someField, ...
previously generated a column
name of SOMESTRUCT__SOMEFIELD
and now generates a name of SOMEFIELD
.
Generated column names may have a numeral appended to the end to ensure
uniqueness, for example SOMEFIELD_2
.
Note
We recommend that you do not rely on system-generated column names for
production systems, because naming logic may change between releases.
Providing an explicit alias ensures consistent naming across releases,
for example, SELECT someStruct->someField AS someField
. For backward
compatibility, existing running queries aren’t affected by this change,
and they will continue to run with the same column names. Any statements
executed after the upgrade will use the new names, where no explicit
alias is provided. Add explicit aliases to your statements if you require
the old names, for example:
SELECT someStruct->someField AS SOMESTRUCT__SOMEFIELD, ...
.
In version 5.5, queries that referenced a single GROUP BY column in the
projection would fail if they were resubmitted, due to a duplicate column.
In 6.0, the same existing queries will continue to run if they’re running
already, which means that this change affects only newly submitted queries.
Existing queries use the earlier query semantics.
Push queries that rely on auto-generated column names may see changes in
column names. Pull queries and any existing persistent queries are unaffected,
for example, those created with CREATE STREAM AS SELECT, CREATE TABLE AS SELECT,
or INSERT INTO.
ksqlDB Server no longer ships with Jetty. This means that when you start
the server, you must supply Jetty-specific dependencies, like certain login
modules used for basic authentication, by using the KSQL_CLASSPATH environment
variable for ksqlDB to find them.
Any key name
Statements containing PARTITION BY, GROUP BY, or JOIN clauses now produce
different output schemas.
For PARTITION BY and GROUP BY clauses, the following rules define how the
the name of the key column in the result is determined:
Where the partitioning or grouping is a single-column reference, then the
key column has the same name as this column, for example:
-- OUTPUT has a key column named X;
CREATE STREAM OUTPUT AS
SELECT *
FROM INPUT
GROUP BY X;
Where the partitioning or grouping is a single field in a struct, the key
column has the same name as the field. For example:
-- OUTPUT has a key column named FIELD1;
CREATE STREAM OUTPUT AS
SELECT *
FROM INPUT
GROUP BY X->field1;
Otherwise, the key column name is system-generated and has the form
KSQL_COL_n
, where n is a positive integer.
In all cases, except where grouping by more than one column, you can set the
new key column’s name by defining an alias in the projection, for example:
-- OUTPUT has a key column named ID.
CREATE TABLE OUTPUT AS
SELECT
USERID AS ID,
COUNT(*)
FROM USERS
GROUP BY ID;
For groupings of multiple expressions, you can’t provide a name for the
system-generated key column. You can work around this by combining the
grouping columns manually, which enables you to provide an alias, for
example:
-- products_by_sub_cat has a key column named COMPOSITEKEY:
CREATE TABLE products_by_sub_cat AS
SELECT
categoryId + ‘§’ + subCategoryId AS compositeKey
SUM(quantity) as totalQty
FROM purchases
GROUP BY CAST(categoryId AS STRING) + ‘§’ + CAST(subCategoryId AS STRING);
For JOIN statements, the name of the key column in the result is determined by
the join criteria.
For INNER and LEFT OUTER joins where the join criteria contain at least one
column reference, the key column is named based on the left-most source
whose join criteria is a column reference, for example:
-- OUTPUT has a key column named I2_ID.
CREATE TABLE OUTPUT AS
SELECT *
FROM I1
JOIN I2 ON abs(I1.ID) = I2.ID JOIN I3 ON I2.ID = I3.ID;
You can give the key column a new name, if required, by defining an alias
in the projection, for example:
-- OUTPUT has a key column named ID.
CREATE TABLE OUTPUT AS
SELECT
I2.ID AS ID,
I1.V0,
I2.V0,
I3.V0
FROM I1
JOIN I2 ON abs(I1.ID) = I2.ID
JOIN I3 ON I2.ID = I3.ID;
For FULL OUTER joins and other joins where the join criteria are not on
column references, the key column in the output is not equivalent to any
column from any source. The key column has a system-generated name in the
form KSQL_COL_n
, where n is a positive integer, for example:
-- OUTPUT has a key column named KSQL_COL_0, or similar.
CREATE TABLE OUTPUT AS
SELECT *
FROM I1
FULL OUTER JOIN I2 ON I1.ID = I2.ID;
You can give the key column a new name, if required, by defining an alias
in the projection. A new UDF, named JOINKEY
, has been introduced in 6.0
to help define the alias. It takes the join criteria as its parameters, for
example:
-- OUTPUT has a key column named ID.
CREATE TABLE OUTPUT AS
SELECT
JOINKEY(I1.ID, I2.ID) AS ID,
I1.V0,
I2.V0
FROM I1
FULL OUTER JOIN I2 ON I1.ID = I2.ID;
Explicit keys
In version 6.0, new CREATE TABLE
statements will fail if the PRIMARY KEY
column isn’t provided. For example, you must update a statement like following to
include the definition of the PRIMARY KEY:
CREATE TABLE FOO (
name STRING
) WITH (
kafka_topic='foo',
value_format='json'
);
Update the previous statement to include the definition of the PRIMARY KEY:
CREATE TABLE FOO (
ID STRING PRIMARY KEY,
name STRING
) WITH (
kafka_topic='foo',
value_format='json'
);
If you load the value columns of the topic from Schema Registry, also known as
“schema inference”, you can provide the primary key as a partial schema, for
example:
-- FOO has value columns loaded from Schema Registry
CREATE TABLE FOO (
ID INT PRIMARY KEY
) WITH (
kafka_topic='foo',
value_format='avro'
);
CREATE STREAM statements that don’t define a KEY
column no longer have an
implicit ROWKEY
key column, for example:
CREATE STREAM BAR (
NAME STRING
) WITH (...);
In version 5.5, the previous statement would have resulted in a stream that had
two columns: ROWKEY STRING KEY
and NAME STRING
. With this change, the
previous statement results in a stream that has only the NAME STRING
column.
Streams with no KEY column are serialized to Apache Kafka® topics with a null
key.
Key columns required in projection
A statement that creates a materialized view must include the key columns in
the projection, for example:
CREATE TABLE OUTPUT AS
SELECT
productId, -- key column in projection
SUM(quantity) as unitsSold
FROM sales
GROUP BY productId;
The key column productId
is required in the projection. In previous
versions of ksqlDB, the presence of productId
in the projection would
have placed a copy of the data into the value of the underlying Kafka topic’s
record. But starting in version 6.0, the projection must include the key
columns, and ksqlDB stores these columns in the key of the underlying
Kafka record. Optionally, you may provide an alias for the key column(s), for
example:
CREATE TABLE OUTPUT AS
SELECT
productId as id, -- aliased key column
SUM(quantity) as unitsSold
FROM sales
GROUP BY productId;
If you need a copy of the key column in the Kafka record’s value, use the
AS_VALUE
function to indicate this to ksqlDB. For example, the following statement
produces an output like earlier versions of ksqlDB for the previous
example materialized view:
CREATE TABLE OUTPUT AS
SELECT
productId as ROWKEY, -- key column named ROWKEY
AS_VALUE(productId) as productId, -- productId copied into value
SUM(quantity) as unitsSold
FROM sales
GROUP BY productId;
WITH(KEY) syntax removed
In previous versions, all key columns were named ROWKEY
. To enable using a
more user-friendly name for the key column in queries, you could supply an
alias for the key column in the WITH clause, for example:
CREATE TABLE INPUT (
ROWKEY INT PRIMARY KEY,
ID INT,
V0 STRING
) WITH (
key='ID', -- removed in ksqlDB 6.0
...
);
With the previous query, the ID
column can be used as an alias for
ROWKEY
. This approach required the Kafka message value to contain an exact
copy of the key.
KLIP-24
removed the restriction that key columns must be named ROWKEY
, eliminating
the need for the WITH(KEY)
syntax, which has been removed. Also, this change
removed the requirement for the Kafka message value to contain an exact copy of
the key.
Update your queries by removing the KEY
from the WITH
clause and naming
your KEY
and PRIMARY KEY
columns appropriately. For example, you can
rewrite the previous CREATE TABLE statement like this:
CREATE TABLE INPUT (
ID INT PRIMARY KEY,
V0 STRING
) WITH (...);
This doesn’t work when the value format is DELIMITED
, because the value
columns are order dependent, so dropping the ID
value column would result
in a deserialization error or the wrong values being loaded. If you’re using
DELIMITED
, consider rewriting the previous example like this:
CREATE TABLE INPUT (
ID INT PRIMARY KEY,
ignoreMe INT,
V0 STRING
) WITH (...);
Basic+Bearer authentication
In ksql-server.properties
, remove the following configuration settings:
rest.servlet.initializor.classes
websocket.servlet.initializor.classes
Add the following setting:
ksql.authentication.plugin.class=io.confluent.ksql.security.VertxBearerOrBasicAuthenticationPlugin