Use this quick start to get up and running with the Confluent Cloud Kinesis source
connector. The quick start shows how to select the connector and configure it to
pull data from Amazon Kinesis and persist the data to an Apache Kafka® topic. It
then monitors and records all subsequent row-level changes.
Using the Confluent Cloud GUI
Step 2: Add a connector.
Click Connectors. If you already have connectors in your cluster, click Add connector.
Step 3: Select your connector.
Click the Amazon Kinesis Source connector icon.
Step 4: Set up the connection.
Complete the following and click Continue.
Note
- Make sure you have all your prerequisites completed.
- An asterisk ( * ) designates a required entry.
- Enter a connector name.
- Enter your Kafka Cluster credentials. The credentials are either the API key and secret or the service account API key and secret.
- Enter the topic name or topic names where you want to send data.
- Enter your AWS credentials.
- Add the Kinesis details.
- (Optional) The stream offset position is where messages start being consumed from the Kinesis stream. The default if not selected is
TRIM_HORIZON
. Available offset positions are:
AT_TIMESTAMP
: Get records starting at a point in time. Used with the timestamp format below.
LATEST
: Start with the most recent record.
TRIM_HORIZON
(default): Start with the last untrimmed record (the oldest record).
- (Optional) The timestamp format to use when when
AT_TIMESTAMP
is selected. Formats allowed are the simple date-time format yyyy-MM-dd’T’HH:mm:ss.SSSXXX
or epoch time in milliseconds.
- Add the Connection details for your connection to the stream.
- Enter the maximum number of connector tasks.
Configuration properties that are not shown in the Confluent Cloud UI use the default values. For default values and property definitions, see
Amazon Kinesis Source Connector for Confluent Platform.
Step 5: Launch the connector.
Verify the connection details and click Launch.
Step 6: Check the connector status.
The status for the connector should go from Provisioning to Running. It may take a few minutes.
Step 7: Check the Kafka topic.
After the connector is running, verify that messages are populating your Kafka topic.
For additional information about this connector, see
Amazon Kinesis Source Connector for Confluent Platform. Note that not all Confluent Platform connector features are
provided in the Confluent Cloud connector.
You can manage your full-service connector using the Confluent Cloud API. For details, see the Confluent Cloud API documentation.
See also
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.
Using the Confluent Cloud CLI
Complete the following steps to set up and run the connector using the Confluent Cloud CLI.
Important
You must create topic names before before creating and launching this
connector. For this Quick Start example, the database table being sourced is
named kinesis-testing
. Before starting these steps, make sure you create
a Kafka topic named kinesis-testing
using the command below:
ccloud kafka topic create kinesis-testing
Step 1: List the available connectors.
Enter the following command to list available connectors:
ccloud connector-catalog list
Step 2: Show the required connector configuration properties.
Enter the following command to show the required connector properties:
ccloud connector-catalog describe <connector-catalog-name>
For example:
ccloud connector-catalog describe KinesisSource
Example output:
Following are the required configs:
connector.class
name
kafka.api.key
kafka.api.secret
kafka.topic
aws.access.key.id
kinesis.region
aws.secret.key.id
kinesis.stream
kinesis.position
tasks.max
Step 3: Create the connector configuration file.
Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.
{
"name" : "confluent-kinesis-source",
"connector.class": "KinesisSource",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret" : "<my-kafka-api-secret>",
"kafka.topic" : "kinesis-testing",
"aws.access.key.id" : "<my-aws-access-key>",
"aws.secret.key.id": "<my-aws-access-key-secret>",
"kinesis.stream": "my-kinesis-stream",
"kinesis.region" : "us-west-2",
"kinesis.position": "AT_TIMESTAMP",
"kinesis.shard.timestamp.ms": "1590692978237"
"tasks.max" : "1"
}
Note the following property definitions:
"name"
: Sets a name for your new connector.
"connector.class"
: Identifies the connector plugin name.
"kinesis.region"
: Identifies the AWS region where the Kinesis data stream is located. Examples are us-west-2
, us-east-2
, ap-northeast-1
, eu-central-1
, and so on.
(Optional) "kinesis.position"
: Identifies the stream offset position. This is where messages start being consumed from the Kinesis stream. Available offset positions are:
AT_TIMESTAMP
: Get records starting at a point in time. Used with the timestamp format below.
LATEST
: Start with the most recent record.
TRIM_HORIZON
(default): Start with the last untrimmed record (the oldest record).
(Optional) "kinesis.shard.timestamp.ms"
: The timestamp format to use when when AT_TIMESTAMP
is selected. Allowed formats are the simple date-time format yyyy-MM-dd’T’HH:mm:ss.SSSXXX
or epoch time in milliseconds.
"tasks.max"
: The maximum number of connector tasks.
Step 4: Load the properties file and create the connector.
Enter the following command to load the configuration and start the connector:
ccloud connector create --config <file-name>.json
For example:
ccloud connector create --config kinesis-source.json
Example output:
Created connector confluent-kinesis-source lcc-ix4dl
Step 5: Check the connector status.
Enter the following command to check the connector status:
Example output:
ID | Name | Status | Type
+-----------+--------------------------+---------+--------+
lcc-ix4dl | confluent-kinesis-source | RUNNING | source
Step 6: Check the Kafka topic.
After the connector is running, verify that messages are populating your Kafka topic.
You can manage your full-service connector using the Confluent Cloud API. For details, see the Confluent Cloud API documentation.
For additional information about this connector, see
Amazon Kinesis Source Connector for Confluent Platform. Note that not all Confluent Platform connector features are
provided in the Confluent Cloud connector.