AWS Lambda Sink Connector for Confluent Platform
The Kafka Connect AWS Lambda sink connector pulls records from one or more
Apache Kafka® topics, converts them to JSON, and executes an AWS Lambda function. The
response of the AWS Lambda can optionally be written to another Kafka topic.
The AWS Lambda function can be invoked either synchronously or asynchronously.
In synchronous mode, records within a topic and partition are processed
sequentially. Records within different topic partitions, though, can be
processed in parallel. If configured, the response from AWS Lambda can be written
to a Kafka topic. In case of errors during Lambda execution, the connector can be
configured to either ignore and proceed, log the error, or stop the connector
completely.
In asynchronous mode, the connector operates in a fire-and-forget mode.
Records are processed on a best-effort, sequential basis. The connector does
not attempt any retries. AWS Lambda automatically retries up to two times, after
which AWS Lambda can move the request to a dead letter queue.
The connector guarantees at-least-once processing semantics. Under certain
circumstances, it is possible that a record is processed more than once. You
should design your AWS Lambda function to be idempotent.
If you have configured the connector to log the response from AWS Lambda to a
Kafka topic, the topic can contain duplicate records. You can enable Kafka log
compaction on the topic to remove duplicate records. Alternatively, you can
write a ksqlDB query to detect duplicate records in a time window.
Prerequisites
The following are required to run the Kafka Connect AWS Lambda Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
- Java 1.8
- AWS credentials (see Access Key ID and Secret Access Key)
Exporting AWS Credentials and Region
Before you can run this connector, you must provide credentials and the region
where the AWS Lambda function is located.
The credentials provided need to have permissions to the actions lambda:InvokeFunction
and lambda:GetFunction
. An example of how this policy may be set up is shown below:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
"lambda:GetFunction"
],
"Resource": "*"
}
]
}
Exporting environment variables is sufficient for a development and testing
environment. However, in a production environment, you should provide
credentials as part of the worker process itself using the configuration
property aws.credentials.provider.class
. This is the credentials provider or
provider chain to use for authentication to AWS. By default, the connector
uses DefaultAWSCredentialsProviderChain
. For details on configuring a
credentials provider, see AWS Credentials.
The information provided in AWS Credentials
is applicable for most connectors accessing resources in Amazon Web Services, including
the AWS Lambda Sink connector.
Export the following AWS environment variables to allow the connector to
access AWS Lambda.
Note
These environment variables must be exported where the Kafka Connect
worker processes and the connector are deployed.
AWS_DEFAULT_REGION
To export, run the following command:
export AWS_DEFAULT_REGION=<your-aws-lambda-region>
AWS_ACCESS_KEY_ID
To export, run the following command:
export AWS_ACCESS_KEY_ID=<your-accesskey-id>
AWS_SECRET_ACCESS_KEY
To export, run the following command:
export AWS_SECRET_ACCESS_KEY=<your-secret-access-key>
You can also set the region and the credentials using the aws.lambda.region
, aws.access.key.id
, and aws.secret.access.key
configurations.
If you add these, they are used by the connector. Additional credentials coming from the aws.credentials.provider.class
configuration are ignored.
Using Trusted Account Credentials
This connector can assume a role and use credentials from a separate trusted
account. This is a default feature provided with recent versions of this
connector that include an updated version of the AWS SDK.
After you create the trust relationship, an IAM user or an application from the trusted account can
use the AWS Security Token Service (AWS STS)
AssumeRole
API operation. This operation provides temporary security credentials that enable
access to AWS resources for the connector. For details, see
Creating a Role to Delegate Permissions to an IAM User.
- Example:
Profile in ~/.aws/credentials:
[default]
role_arn=arn:aws:iam::037803949979:role/kinesis_cross_account_role
source_profile=staging
role_session_name = OPTIONAL_SESSION_NAME
[staging]
aws_access_key_id = <STAGING KEY>
aws_secret_access_key = <STAGING SECRET>
To allow the connector to assume a role with the right permissions, set the
Amazon Resource Name (ARN)
for this role. Additionally, you must choose between source_profile
or credential_source
as the way to get credentials that have permission to assume the role, in the environment where the
connector is running.
Note
When setting up trusted account credentials, be aware that the approach of loading profiles from
both ~/.aws/credentials
and ~/.aws/config
does not work when configuring this connector.
Assumed role settings and credentials must be placed in the ~/.aws/credentials
file.
Install the AWS Lambda Connector
You can install this connector by using the instructions or you can
manually download the ZIP file.
Install the connector using Confluent Hub
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-aws-lambda:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-aws-lambda:1.0.0-preview
Configuration Properties
For a complete list of configuration properties for this connector, see AWS Lambda Sink Connector Configuration Properties.
Property-based example
This configuration is typically used with standalone workers.
name=LambdaSinkConnector
connector.class=io.confluent.connect.aws.lambda.AwsLambdaSinkConnector
tasks.max=1
topics=<Required Configuration>
aws.lambda.function.name=<Required Configuration>
aws.lambda.invocation.type=sync
aws.lambda.batch.size=50
behavior.on.error=fail
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
REST-based example
This configuration is typically used with distributed workers.
Write the following JSON to connector.json
,
configure all of the required values. Use the command below to post the
configuration to one of the distributed Kafka Connect worker(s). See
Kafka Connect REST API for more information.
{
"name": "LambdaSinkConnector",
"config" : {
"connector.class" : "io.confluent.connect.aws.lambda.AwsLambdaSinkConnector",
"tasks.max" : "1",
"topics" : "< Required Configuration >",
"aws.lambda.function.name" : "< Required Configuration >",
"aws.lambda.invocation.type" : "sync",
"aws.lambda.batch.size" : "50",
"behavior.on.error" : "fail",
"confluent.topic.bootstrap.servers" : "localhost:9092",
"confluent.topic.replication.factor" : "1"
}
}
Use curl to post the configuration to one of the Kafka Connect workers. Change
http://localhost:8083/
the endpoint of one of your Kafka Connect
workers.
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
AWS Lambda Payload
The default payload converter converts Kafka records to payload in the form of
JSON Array, following is a sample payload:
[
{
"payload": {
"topic": "mytopic",
"partition": 1,
"offset": 101,
"key": "key_from_sink_record",
"value": "value_from_sink_record",
"headers": [{"key1":"value1"},{"key2":"value2"},{...},...],
"timestamp": 1562844607000
}
},
{
"payload": {
"topic": "mytopic",
"partition": 1,
"offset": 102,
"key": "key_from_sink_record",
"value": "value_from_sink_record",
"headers": [{...},{...},...],
"timestamp": 1562844608000
}
}
]
The key
and value
are converted to either JSON primitives or objects according to their schema. If no schema is defined, they are encoded as plain strings.
For any AWS Lambda invocation, all the records belong to the same topic and partition, and the offset will be in a strictly increasing order.
When the connector is configured, it validates if the Lambda function exists, and if the AWS credentials used can invoke the Lambda function.
Batching Records
The AWS Lambda sink connector combines multiple records into the input payload for the Lambda Function invocation. The following rules apply:
- A batch of records will belong to the same topic and partition.
- A batch always has records in increasing order of the offset.
- Total number of records in a batch is less than or equal to the configuration
aws.lambda.batch.size
and the size of the batch is less than the AWS Lambda Payload Limits.
- To disable batching, set
aws.lambda.batch.size
to 1.
Response Topic
In sync
mode, the connector can optionally log the response from AWS Lambda in a Kafka topic using Kafka Connect Reporter.
The connector attempts to map each response to a single record before producing
it to the corresponding topic. It can receive the responses from the AWS lambda
Function in the following three formats.
The first format is JSON:
[
{
"payload": {
"result": ...,
"topic": string,
"partition": <number>,
"offset": <number>,
}
},
...
]
This list can be out of order relative to the order that the records were provided. The connector will correctly match the record to the result based off its Kafka coordinates. However the list must be one-to-one to the list of records that were sent in the request.
The second format is a JSON list:
As long as the list is one-to-one to the list of records, the list will be assumed to be ordered and matched with the corresponding records.
The third format can be any format that does not satisfy either of the above formats. The connector will report the entire response for each individual record (one-to-many correlation).
The connector logs the output of each record from the AWS Lambda function response in the configured Kafka topic as JSON. The following is an example of a logged JSON response from a AWS Lambda function:
[
{
"topic": "lambda-success",
"partition": 1,
"offset": 101,
"timestamp": 1562844608000,
"headers": [{...},{...},...],
"key": "sink_record_key",
"value": {
"body": "Successfully executed Lambda!",
"status_code": 200
}
},
...
]
AWS Lambda Function:
def lambda_handler(event, context):
result_list = []
for obj in event:
result = {}
result["body"] = "Successfully executed Lambda!";
result["status_code"] = 200;
result_list.append(result)
return result_list
To enable a response topic, set reporter.result.topic.name
to the topic where you want to log the responses. The following shows a sample set of properties to add to configure a response topic:
aws.lambda.invocation.type=sync
reporter.result.topic.name=<Required Configuration>
reporter.bootstrap.servers=<Required Configuration>
Note that in async
mode, the connector cannot log the response.
See Connect Reporter for more about using this connector with Kafka Connect Reporter.
Error Handling
The AWS Lambda sink connector may encounter the following types of errors:
- Transient errors such as network timeouts or errors because of rate limiting.
- Configuration errors such as an incorrect Lambda Function name or access-related issues.
- Errors encountered during execution of the Lambda Function. These errors are further classified as handled or unhandled.
The AWS Lambda sink connector automatically handles transient errors such as network timeouts. The connector relies on the AWS Lambda SDK to perform retries.
In case of configuration errors, the connector does not retry. Instead, it
throws a ConnectException
and stops the task.
In case of errors encountered during execution of the Lambda Function, the behavior of the connector depends on the configuration parameters aws.lambda.invocation.type
and behavior.on.error
.
aws.lambda.invocation.type |
behavior.on.error |
Error Handling |
async |
|
In asynchronous mode, the connector relies on AWS Lambda to perform retries and error handling. AWS Lambda retries the function twice (a total of three attempts), after which it discards the event. You should configure a dead letter queue if you want to track the input events that failed. |
sync |
fail |
fail is the default mode. The connector stops processing records for that TopicPartition. Records for other TopicPartitions will continue to process. |
sync |
log |
The connector will log the error message and continue processing the next batch of records. See Connect Reporter for how to configure the Kafka Connect Reporter to report errors in a separate topic. |
Logging Errors to Kafka topic
The connector can optionally log errors to a Kafka topic using Kafka Connect Reporter. Use the following
configuration settings to enable error logging:
aws.lambda.invocation.type=sync
behavior.on.error=log
reporter.error.topic.name=<Required Configuration>
reporter.bootstrap.servers=<Required Configuration>
For details about using Kafka Connect Reporter, see Connect Reporter.
Additional Documentation
AWS LAMBDA SINK CONNECTOR