CONFLUENT PLATFORM
This example shows users how to build pipelines with Apache Kafka®.
It showcases different ways to produce data to Apache Kafka® topics, with and without Kafka Connect, and various ways to serialize it for the Kafka Streams API and ksqlDB.
Detailed walk-thru of this example is available in the whitepaper Kafka Serialization and Deserialization (SerDes) Examples and the blog post Building a Real-Time Streaming ETL Pipeline in 20 Minutes
The original data is a table of locations that resembles this.
id|name|sale 1|Raleigh|300 2|Dusseldorf|100 1|Raleigh|600 3|Moscow|800 4|Sydney|200 2|Dusseldorf|400 5|Chennai|400 3|Moscow|100 3|Moscow|200 1|Raleigh|700
In produces records to a Kafka topic:
The actual client application uses the methods count and sum to process this data, grouped by each city.
count
sum
The output of count is:
1|Raleigh|3 2|Dusseldorf|2 3|Moscow|3 4|Sydney|1 5|Chennai|1
The output of sum is:
1|Raleigh|1600 2|Dusseldorf|500 3|Moscow|1100 4|Sydney|200 5|Chennai|400
mvn
timeout
Clone the examples GitHub repository and check out the 6.1.0-post branch.
6.1.0-post
git clone https://github.com/confluentinc/examples cd examples git checkout 6.1.0-post
Change directory to the connect-streams-pipeline example.
cd connect-streams-pipeline
Run the examples end-to-end:
./start.sh
If you are running Confluent Platform, open your browser and navigate to the Confluent Control Center web interface Management -> Connect tab at http://localhost:9021/management/connect to see the data in the Kafka topics and the deployed connectors.
confluent local services kafka produce
String
Serdes.String()
SMTs
int64
org.apache.kafka.connect.converters.LongConverter
## # Copyright 2020 Confluent Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ## # A simple example that copies all tables from a SQLite database. The first few settings are # required for all connectors: a name, the connector class to run, and the maximum number of # tasks to create: name=test-source-sqlite-jdbc-autoincrement-jdbcjson connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 # The remaining configs are specific to the JDBC source connector. In this example, we connect to a # SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to # detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g. # a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'. connection.url=jdbc:sqlite:/usr/local/lib/retail.db mode=incrementing incrementing.column.name=id topic.prefix=jdbcjson- table.whitelist=locations transforms=InsertKey, ExtractId, CastLong transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey transforms.InsertKey.fields=id transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.ExtractId.field=id transforms.CastLong.type=org.apache.kafka.connect.transforms.Cast$Key transforms.CastLong.spec=int64 key.converter=org.apache.kafka.connect.converters.LongConverter key.converter.schemas.enable=false value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false
Serdes.Long()
SetSchemaMetadata
GenericAvro
SpecificAvro
## # Copyright 2020 Confluent Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ## # A simple example that copies all tables from a SQLite database. The first few settings are # required for all connectors: a name, the connector class to run, and the maximum number of # tasks to create: name=test-source-sqlite-jdbc-autoincrement-jdbcspecificavro connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 # The remaining configs are specific to the JDBC source connector. In this example, we connect to a # SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to # detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g. # a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'. connection.url=jdbc:sqlite:/usr/local/lib/retail.db mode=incrementing incrementing.column.name=id topic.prefix=jdbcspecificavro- table.whitelist=locations value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 value.converter.schemas.enable=true transforms=SetValueSchema transforms.SetValueSchema.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value transforms.SetValueSchema.schema.name=io.confluent.examples.connectandstreams.avro.Location
SpecificAvroSerde
map
Long
## # Copyright 2020 Confluent Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ## # A simple example that copies all tables from a SQLite database. The first few settings are # required for all connectors: a name, the connector class to run, and the maximum number of # tasks to create: name=test-source-sqlite-jdbc-autoincrement-jdbcgenericavro connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 # The remaining configs are specific to the JDBC source connector. In this example, we connect to a # SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to # detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g. # a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'. connection.url=jdbc:sqlite:/usr/local/lib/retail.db mode=incrementing incrementing.column.name=id topic.prefix=jdbcgenericavro- table.whitelist=locations value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 value.converter.schemas.enable=true
GenericAvroSerde
## # Copyright 2020 Confluent Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ## # A simple example that copies all tables from a SQLite database. The first few settings are # required for all connectors: a name, the connector class to run, and the maximum number of # tasks to create: name=test-source-sqlite-jdbc-autoincrement-jdbcavroksql connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 # The remaining configs are specific to the JDBC source connector. In this example, we connect to a # SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to # detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g. # a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'. connection.url=jdbc:sqlite:/usr/local/lib/retail.db mode=incrementing incrementing.column.name=id topic.prefix=jdbcavroksql- table.whitelist=locations key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 value.converter.schemas.enable=true
PARTITION BY
BIGINT
StreamsBuilder#stream()
KStream#groupByKey()
--key-serializer
--line-reader